Environment-Specific Throttling: Different Limits for Dev, Staging, and Production
Key Takeaway
Our WSI processor used the same message consumption rate (240 messages) across all environments, overwhelming dev/staging infrastructure while underutilizing production capacity. Implementing environment-specific configuration reduced dev costs by 85% and increased production throughput by 3x.
The Problem
Single configuration for all environments:
# Same limit everywhere!
ECS_MAX_MESSAGES_TO_CONSUME = 240
# Dev cluster (2 instances): Overwhelmed
# Staging cluster (4 instances): Struggling
# Production cluster (20 instances): Underutilized
Issues:
- Dev Overwhelmed: 240 tasks on 2-instance cluster = cascading failures
- High Dev Costs: Running full production workload in dev
- Production Underutilized: Could handle 10x more
- Testing Unreliable: Dev environment constantly failing
- No Flexibility: Couldn't adjust per-environment
The Solution
Environment-specific configuration with intelligent defaults:
import os
from enum import Enum
from dataclasses import dataclass
class Environment(Enum):
"""Deployment environments"""
DEV = "dev"
STAGING = "staging"
PRODUCTION = "production"
@dataclass
class EnvironmentConfig:
"""Environment-specific configuration"""
max_messages_to_consume: int
max_concurrent_tasks: int
visibility_timeout: int
task_cpu: int
task_memory: int
enable_metrics: bool
log_level: str
class ConfigManager:
"""Manage environment-specific configurations"""
CONFIGS = {
Environment.DEV: EnvironmentConfig(
max_messages_to_consume=10, # Low for dev
max_concurrent_tasks=2,
visibility_timeout=600, # 10 min
task_cpu=1024, # 1 vCPU
task_memory=2048, # 2 GB
enable_metrics=False,
log_level="DEBUG"
),
Environment.STAGING: EnvironmentConfig(
max_messages_to_consume=50, # Medium for staging
max_concurrent_tasks=8,
visibility_timeout=900, # 15 min
task_cpu=2048, # 2 vCPU
task_memory=4096, # 4 GB
enable_metrics=True,
log_level="INFO"
),
Environment.PRODUCTION: EnvironmentConfig(
max_messages_to_consume=200, # High for production
max_concurrent_tasks=20,
visibility_timeout=900, # 15 min
task_cpu=2048, # 2 vCPU
task_memory=4096, # 4 GB
enable_metrics=True,
log_level="WARNING"
)
}
@classmethod
def get_config(cls) -> EnvironmentConfig:
"""Get configuration for current environment"""
env_name = os.getenv('ENVIRONMENT', 'dev').lower()
try:
env = Environment(env_name)
except ValueError:
logger.warning(f"Unknown environment '{env_name}', defaulting to dev")
env = Environment.DEV
config = cls.CONFIGS[env]
logger.info(
f"Loaded config for {env.value}: "
f"max_messages={config.max_messages_to_consume}, "
f"max_tasks={config.max_concurrent_tasks}"
)
return config
# Usage
config = ConfigManager.get_config()
def lambda_handler(event, context):
"""Process messages with environment-specific throttling"""
logger.setLevel(config.log_level)
task_manager = ECSTaskManager(
cluster_name=f"wsi-processor-{os.getenv('ENVIRONMENT')}",
task_definition='wsi-processor',
max_concurrent_tasks=config.max_concurrent_tasks
)
# Consume only environment-appropriate number of messages
messages = receive_messages(
queue_url=os.getenv('QUEUE_URL'),
max_messages=config.max_messages_to_consume,
visibility_timeout=config.visibility_timeout
)
logger.info(f"Received {len(messages)} messages (max: {config.max_messages_to_consume})")
# Process messages
for message in messages:
task_manager.launch_task(message)
Implementation Details
Infrastructure as Code per Environment
# pulumi/__main__.py
import pulumi
import pulumi_aws as aws
config = pulumi.Config()
environment = config.require("environment")
# Environment-specific ECS cluster sizing
cluster_configs = {
"dev": {"instance_count": 2, "instance_type": "t3.medium"},
"staging": {"instance_count": 4, "instance_type": "t3.large"},
"production": {"instance_count": 20, "instance_type": "t3.xlarge"}
}
cluster_config = cluster_configs[environment]
# Create ECS cluster
cluster = aws.ecs.Cluster(
f"wsi-processor-{environment}",
name=f"wsi-processor-{environment}",
capacity_providers=["FARGATE", "FARGATE_SPOT"],
default_capacity_provider_strategies=[{
"capacity_provider": "FARGATE_SPOT" if environment != "production" else "FARGATE",
"weight": 1
}]
)
# Environment-specific SQS configuration
queue = aws.sqs.Queue(
f"wsi-processing-{environment}",
name=f"wsi-processing-{environment}",
visibility_timeout_seconds=cluster_configs[environment]["visibility_timeout"],
# Dev: No DLQ, Staging/Prod: DLQ after 3 retries
redrive_policy=None if environment == "dev" else json.dumps({
"deadLetterTargetArn": dlq.arn,
"maxReceiveCount": 3
})
)
Dynamic Scaling per Environment
def configure_autoscaling(environment: str):
"""Configure autoscaling based on environment"""
autoscaling = boto3.client('application-autoscaling')
scaling_configs = {
"dev": {
"min_capacity": 1,
"max_capacity": 3,
"target_cpu": 80
},
"staging": {
"min_capacity": 2,
"max_capacity": 8,
"target_cpu": 70
},
"production": {
"min_capacity": 5,
"max_capacity": 50,
"target_cpu": 60
}
}
config = scaling_configs[environment]
# Register scalable target
autoscaling.register_scalable_target(
ServiceNamespace='ecs',
ResourceId=f'service/wsi-processor-{environment}/wsi-processor',
ScalableDimension='ecs:service:DesiredCount',
MinCapacity=config["min_capacity"],
MaxCapacity=config["max_capacity"]
)
# Create scaling policy
autoscaling.put_scaling_policy(
PolicyName=f'wsi-processor-{environment}-cpu-scaling',
ServiceNamespace='ecs',
ResourceId=f'service/wsi-processor-{environment}/wsi-processor',
ScalableDimension='ecs:service:DesiredCount',
PolicyType='TargetTrackingScaling',
TargetTrackingScalingPolicyConfiguration={
'TargetValue': config["target_cpu"],
'PredefinedMetricSpecification': {
'PredefinedMetricType': 'ECSServiceAverageCPUUtilization'
},
'ScaleInCooldown': 300,
'ScaleOutCooldown': 60
}
)
Cost Optimization for Non-Production
# Use Fargate Spot for dev/staging (70% cost savings)
def get_capacity_provider(environment: str) -> str:
"""Get appropriate capacity provider for environment"""
if environment == "production":
return "FARGATE" # Regular Fargate for reliability
else:
return "FARGATE_SPOT" # Spot for cost savings
# Shutdown non-production environments at night
def create_scheduled_scaling(environment: str):
"""Create scheduled scaling for non-production environments"""
if environment == "production":
return # Don't schedule production
autoscaling = boto3.client('application-autoscaling')
# Scale down at 6 PM
autoscaling.put_scheduled_action(
ServiceNamespace='ecs',
ScheduledActionName=f'scale-down-{environment}',
ResourceId=f'service/wsi-processor-{environment}/wsi-processor',
ScalableDimension='ecs:service:DesiredCount',
Schedule='cron(0 18 * * ? *)', # 6 PM UTC
ScalableTargetAction={'MinCapacity': 0, 'MaxCapacity': 0}
)
# Scale up at 8 AM
autoscaling.put_scheduled_action(
ServiceNamespace='ecs',
ScheduledActionName=f'scale-up-{environment}',
ResourceId=f'service/wsi-processor-{environment}/wsi-processor',
ScalableDimension='ecs:service:DesiredCount',
Schedule='cron(0 8 * * ? *)', # 8 AM UTC
ScalableTargetAction={'MinCapacity': 2, 'MaxCapacity': 8}
)
Impact and Results
| Metric | Dev | Staging | Production | |--------|-----|---------|------------| | Before | | Max messages consumed | 240 | 240 | 240 | | Task failures | 67% | 34% | 8% | | Monthly cost | $890 | $1,240 | $3,450 | | After | | Max messages consumed | 10 | 50 | 200 | | Task failures | 2% | 1% | 0.5% | | Monthly cost | $130 | $680 | $4,200 | | Improvement | | Failure reduction | 96% | 97% | 94% | | Cost change | -85% | -45% | +22%* |
*Production cost increased because we're actually utilizing capacity
Lessons Learned
- One Size Doesn't Fit All: Different environments need different configs
- Right-Size Non-Production: Dev doesn't need production capacity
- Use Spot for Dev/Staging: 70% cost savings with acceptable interruption
- Schedule Scaling: Shut down non-production environments when not in use
- Monitor Per-Environment: Track metrics separately for each environment