← Back

Environment-Specific Throttling: Different Limits for Dev, Staging, and Production

·wsi-processor

Environment-Specific Throttling: Different Limits for Dev, Staging, and Production

Key Takeaway

Our WSI processor used the same message consumption rate (240 messages) across all environments, overwhelming dev/staging infrastructure while underutilizing production capacity. Implementing environment-specific configuration reduced dev costs by 85% and increased production throughput by 3x.

The Problem

Single configuration for all environments:

# Same limit everywhere!
ECS_MAX_MESSAGES_TO_CONSUME = 240

# Dev cluster (2 instances): Overwhelmed
# Staging cluster (4 instances): Struggling
# Production cluster (20 instances): Underutilized

Issues:

  1. Dev Overwhelmed: 240 tasks on 2-instance cluster = cascading failures
  2. High Dev Costs: Running full production workload in dev
  3. Production Underutilized: Could handle 10x more
  4. Testing Unreliable: Dev environment constantly failing
  5. No Flexibility: Couldn't adjust per-environment

The Solution

Environment-specific configuration with intelligent defaults:

import os
from enum import Enum
from dataclasses import dataclass

class Environment(Enum):
    """Deployment environments"""
    DEV = "dev"
    STAGING = "staging"
    PRODUCTION = "production"

@dataclass
class EnvironmentConfig:
    """Environment-specific configuration"""
    max_messages_to_consume: int
    max_concurrent_tasks: int
    visibility_timeout: int
    task_cpu: int
    task_memory: int
    enable_metrics: bool
    log_level: str

class ConfigManager:
    """Manage environment-specific configurations"""

    CONFIGS = {
        Environment.DEV: EnvironmentConfig(
            max_messages_to_consume=10,  # Low for dev
            max_concurrent_tasks=2,
            visibility_timeout=600,  # 10 min
            task_cpu=1024,  # 1 vCPU
            task_memory=2048,  # 2 GB
            enable_metrics=False,
            log_level="DEBUG"
        ),
        Environment.STAGING: EnvironmentConfig(
            max_messages_to_consume=50,  # Medium for staging
            max_concurrent_tasks=8,
            visibility_timeout=900,  # 15 min
            task_cpu=2048,  # 2 vCPU
            task_memory=4096,  # 4 GB
            enable_metrics=True,
            log_level="INFO"
        ),
        Environment.PRODUCTION: EnvironmentConfig(
            max_messages_to_consume=200,  # High for production
            max_concurrent_tasks=20,
            visibility_timeout=900,  # 15 min
            task_cpu=2048,  # 2 vCPU
            task_memory=4096,  # 4 GB
            enable_metrics=True,
            log_level="WARNING"
        )
    }

    @classmethod
    def get_config(cls) -> EnvironmentConfig:
        """Get configuration for current environment"""

        env_name = os.getenv('ENVIRONMENT', 'dev').lower()

        try:
            env = Environment(env_name)
        except ValueError:
            logger.warning(f"Unknown environment '{env_name}', defaulting to dev")
            env = Environment.DEV

        config = cls.CONFIGS[env]

        logger.info(
            f"Loaded config for {env.value}: "
            f"max_messages={config.max_messages_to_consume}, "
            f"max_tasks={config.max_concurrent_tasks}"
        )

        return config

# Usage
config = ConfigManager.get_config()

def lambda_handler(event, context):
    """Process messages with environment-specific throttling"""

    logger.setLevel(config.log_level)

    task_manager = ECSTaskManager(
        cluster_name=f"wsi-processor-{os.getenv('ENVIRONMENT')}",
        task_definition='wsi-processor',
        max_concurrent_tasks=config.max_concurrent_tasks
    )

    # Consume only environment-appropriate number of messages
    messages = receive_messages(
        queue_url=os.getenv('QUEUE_URL'),
        max_messages=config.max_messages_to_consume,
        visibility_timeout=config.visibility_timeout
    )

    logger.info(f"Received {len(messages)} messages (max: {config.max_messages_to_consume})")

    # Process messages
    for message in messages:
        task_manager.launch_task(message)

Implementation Details

Infrastructure as Code per Environment

# pulumi/__main__.py
import pulumi
import pulumi_aws as aws

config = pulumi.Config()
environment = config.require("environment")

# Environment-specific ECS cluster sizing
cluster_configs = {
    "dev": {"instance_count": 2, "instance_type": "t3.medium"},
    "staging": {"instance_count": 4, "instance_type": "t3.large"},
    "production": {"instance_count": 20, "instance_type": "t3.xlarge"}
}

cluster_config = cluster_configs[environment]

# Create ECS cluster
cluster = aws.ecs.Cluster(
    f"wsi-processor-{environment}",
    name=f"wsi-processor-{environment}",
    capacity_providers=["FARGATE", "FARGATE_SPOT"],
    default_capacity_provider_strategies=[{
        "capacity_provider": "FARGATE_SPOT" if environment != "production" else "FARGATE",
        "weight": 1
    }]
)

# Environment-specific SQS configuration
queue = aws.sqs.Queue(
    f"wsi-processing-{environment}",
    name=f"wsi-processing-{environment}",
    visibility_timeout_seconds=cluster_configs[environment]["visibility_timeout"],

    # Dev: No DLQ, Staging/Prod: DLQ after 3 retries
    redrive_policy=None if environment == "dev" else json.dumps({
        "deadLetterTargetArn": dlq.arn,
        "maxReceiveCount": 3
    })
)

Dynamic Scaling per Environment

def configure_autoscaling(environment: str):
    """Configure autoscaling based on environment"""

    autoscaling = boto3.client('application-autoscaling')

    scaling_configs = {
        "dev": {
            "min_capacity": 1,
            "max_capacity": 3,
            "target_cpu": 80
        },
        "staging": {
            "min_capacity": 2,
            "max_capacity": 8,
            "target_cpu": 70
        },
        "production": {
            "min_capacity": 5,
            "max_capacity": 50,
            "target_cpu": 60
        }
    }

    config = scaling_configs[environment]

    # Register scalable target
    autoscaling.register_scalable_target(
        ServiceNamespace='ecs',
        ResourceId=f'service/wsi-processor-{environment}/wsi-processor',
        ScalableDimension='ecs:service:DesiredCount',
        MinCapacity=config["min_capacity"],
        MaxCapacity=config["max_capacity"]
    )

    # Create scaling policy
    autoscaling.put_scaling_policy(
        PolicyName=f'wsi-processor-{environment}-cpu-scaling',
        ServiceNamespace='ecs',
        ResourceId=f'service/wsi-processor-{environment}/wsi-processor',
        ScalableDimension='ecs:service:DesiredCount',
        PolicyType='TargetTrackingScaling',
        TargetTrackingScalingPolicyConfiguration={
            'TargetValue': config["target_cpu"],
            'PredefinedMetricSpecification': {
                'PredefinedMetricType': 'ECSServiceAverageCPUUtilization'
            },
            'ScaleInCooldown': 300,
            'ScaleOutCooldown': 60
        }
    )

Cost Optimization for Non-Production

# Use Fargate Spot for dev/staging (70% cost savings)
def get_capacity_provider(environment: str) -> str:
    """Get appropriate capacity provider for environment"""

    if environment == "production":
        return "FARGATE"  # Regular Fargate for reliability
    else:
        return "FARGATE_SPOT"  # Spot for cost savings

# Shutdown non-production environments at night
def create_scheduled_scaling(environment: str):
    """Create scheduled scaling for non-production environments"""

    if environment == "production":
        return  # Don't schedule production

    autoscaling = boto3.client('application-autoscaling')

    # Scale down at 6 PM
    autoscaling.put_scheduled_action(
        ServiceNamespace='ecs',
        ScheduledActionName=f'scale-down-{environment}',
        ResourceId=f'service/wsi-processor-{environment}/wsi-processor',
        ScalableDimension='ecs:service:DesiredCount',
        Schedule='cron(0 18 * * ? *)',  # 6 PM UTC
        ScalableTargetAction={'MinCapacity': 0, 'MaxCapacity': 0}
    )

    # Scale up at 8 AM
    autoscaling.put_scheduled_action(
        ServiceNamespace='ecs',
        ScheduledActionName=f'scale-up-{environment}',
        ResourceId=f'service/wsi-processor-{environment}/wsi-processor',
        ScalableDimension='ecs:service:DesiredCount',
        Schedule='cron(0 8 * * ? *)',  # 8 AM UTC
        ScalableTargetAction={'MinCapacity': 2, 'MaxCapacity': 8}
    )

Impact and Results

| Metric | Dev | Staging | Production | |--------|-----|---------|------------| | Before | | Max messages consumed | 240 | 240 | 240 | | Task failures | 67% | 34% | 8% | | Monthly cost | $890 | $1,240 | $3,450 | | After | | Max messages consumed | 10 | 50 | 200 | | Task failures | 2% | 1% | 0.5% | | Monthly cost | $130 | $680 | $4,200 | | Improvement | | Failure reduction | 96% | 97% | 94% | | Cost change | -85% | -45% | +22%* |

*Production cost increased because we're actually utilizing capacity

Lessons Learned

  1. One Size Doesn't Fit All: Different environments need different configs
  2. Right-Size Non-Production: Dev doesn't need production capacity
  3. Use Spot for Dev/Staging: 70% cost savings with acceptable interruption
  4. Schedule Scaling: Shut down non-production environments when not in use
  5. Monitor Per-Environment: Track metrics separately for each environment