ECS Concurrency and Resource Management: Preventing Task Thrashing
Key Takeaway
Our ECS cluster launched too many WSI processing tasks concurrently, exhausting memory and causing cascading failures as tasks competed for limited resources. Implementing concurrency limits and resource reservations reduced task failure rate from 45% to 2% and improved throughput by 3x.
The Problem
We consumed messages from SQS without limiting concurrent ECS tasks:
# Lambda that triggers ECS tasks
def lambda_handler(event, context):
for record in event['Records']:
# Launch ECS task for each message
# No concurrency control!
ecs.run_task(
cluster='wsi-processor',
taskDefinition='wsi-processor',
launchType='FARGATE'
)
# Cluster gets overwhelmed!
Issues:
- Resource Exhaustion: 50+ tasks launched simultaneously
- Memory Thrashing: Tasks killed for OOM
- CPU Starvation: Tasks took 5x longer due to contention
- Cascading Failures: Failed tasks requeued, making problem worse
- High Costs: Running many slow tasks more expensive than few fast tasks
The Solution
Implemented concurrency limits and intelligent task management:
import boto3
from typing import Optional
class ECSTaskManager:
"""Manage ECS task concurrency and resources"""
def __init__(
self,
cluster_name: str,
task_definition: str,
max_concurrent_tasks: int = 10
):
self.cluster_name = cluster_name
self.task_definition = task_definition
self.max_concurrent_tasks = max_concurrent_tasks
self.ecs = boto3.client('ecs')
def get_running_task_count(self) -> int:
"""Get count of currently running tasks"""
response = self.ecs.list_tasks(
cluster=self.cluster_name,
desiredStatus='RUNNING',
family=self.task_definition.split(':')[0] # Remove version
)
return len(response.get('taskArns', []))
def can_launch_task(self) -> bool:
"""Check if we can launch another task"""
current_count = self.get_running_task_count()
logger.info(
f"Current tasks: {current_count}/{self.max_concurrent_tasks}"
)
return current_count < self.max_concurrent_tasks
def launch_task(self, message: dict) -> Optional[str]:
"""Launch ECS task if within concurrency limit"""
if not self.can_launch_task():
logger.warning(
f"At max concurrency ({self.max_concurrent_tasks}), "
f"skipping task launch"
)
return None
# Get message details
body = json.loads(message['Body'])
# Launch task with resource constraints
response = self.ecs.run_task(
cluster=self.cluster_name,
taskDefinition=self.task_definition,
launchType='FARGATE',
platformVersion='1.4.0',
# Resource requirements
overrides={
'containerOverrides': [{
'name': 'wsi-processor',
'environment': [
{'name': 'S3_BUCKET', 'value': body['bucket']},
{'name': 'S3_KEY', 'value': body['key']},
{'name': 'SQS_RECEIPT_HANDLE', 'value': message['ReceiptHandle']}
],
# Set CPU and memory
'cpu': 2048, # 2 vCPU
'memory': 4096 # 4 GB
}]
},
# Network configuration
networkConfiguration={
'awsvpcConfiguration': {
'subnets': [os.getenv('SUBNET_ID')],
'securityGroups': [os.getenv('SECURITY_GROUP_ID')],
'assignPublicIp': 'ENABLED'
}
},
# Tags for tracking
tags=[
{'key': 'Source', 'value': 'SQS'},
{'key': 'S3Key', 'value': body['key']},
{'key': 'MessageId', 'value': message['MessageId']}
]
)
task_arn = response['tasks'][0]['taskArn']
logger.info(f"Launched task: {task_arn}")
return task_arn
def lambda_handler(event, context):
"""Process SQS messages with concurrency control"""
task_manager = ECSTaskManager(
cluster_name='wsi-processor-cluster',
task_definition='wsi-processor:latest',
max_concurrent_tasks=10 # Limit concurrent tasks
)
launched = 0
skipped = 0
for record in event['Records']:
task_arn = task_manager.launch_task(record)
if task_arn:
launched += 1
else:
skipped += 1
logger.info(f"Launched: {launched}, Skipped: {skipped}")
# Return skipped count to CloudWatch
return {
'launched': launched,
'skipped': skipped,
'running_tasks': task_manager.get_running_task_count()
}
Implementation Details
Dynamic Concurrency Based on Cluster Capacity
def calculate_optimal_concurrency(self) -> int:
"""Calculate optimal concurrency based on cluster resources"""
# Get cluster capacity
response = self.ecs.describe_clusters(clusters=[self.cluster_name])
cluster = response['clusters'][0]
# Get registered container instances
registered_cpu = cluster.get('registeredContainerInstancesCount', 0) * 2048
registered_memory = cluster.get('registeredContainerInstancesCount', 0) * 4096
# Calculate how many tasks can fit
tasks_by_cpu = registered_cpu // 2048 # Each task needs 2 vCPU
tasks_by_memory = registered_memory // 4096 # Each task needs 4GB
optimal = min(tasks_by_cpu, tasks_by_memory)
# Leave 20% headroom
return int(optimal * 0.8)
SQS Integration with Throttling
# Configure SQS to throttle based on ECS capacity
sqs = boto3.client('sqs')
def configure_sqs_throttling(queue_url: str, max_tasks: int):
"""Configure SQS to respect ECS concurrency limits"""
# Set visibility timeout based on expected processing time
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
# 15 minutes visibility
'VisibilityTimeout': '900',
# Receive up to max_tasks messages
'ReceiveMessageWaitTimeSeconds': '20', # Long polling
# Maximum messages per receive
'MaximumMessageSize': '262144' # 256 KB
}
)
# Lambda reserved concurrency
# Limits how many Lambda instances can run
# This indirectly limits ECS task launches
lambda_client = boto3.client('lambda')
lambda_client.put_function_concurrency(
FunctionName='wsi-processor-launcher',
ReservedConcurrentExecutions=5 # Max 5 Lambda instances
)
CloudWatch Alarms for Capacity
cloudwatch = boto3.client('cloudwatch')
def create_capacity_alarms():
"""Create alarms for ECS capacity issues"""
# Alarm when running tasks exceed threshold
cloudwatch.put_metric_alarm(
AlarmName='WSI-Processor-High-Task-Count',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='RunningTaskCount',
Namespace='AWS/ECS',
Period=60,
Statistic='Average',
Threshold=15,
ActionsEnabled=True,
AlarmActions=[os.getenv('SNS_TOPIC_ARN')],
Dimensions=[
{'Name': 'ClusterName', 'Value': 'wsi-processor-cluster'}
]
)
# Alarm for task failure rate
cloudwatch.put_metric_alarm(
AlarmName='WSI-Processor-High-Failure-Rate',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='TaskFailureRate',
Namespace='WSI/Processing',
Period=300,
Statistic='Average',
Threshold=0.1, # 10% failure rate
ActionsEnabled=True,
AlarmActions=[os.getenv('SNS_TOPIC_ARN')]
)
Impact and Results
| Metric | Before | After | Improvement | |--------|--------|-------|-------------| | Task failure rate | 45% | 2% | 96% reduction | | Average processing time | 18 min | 6 min | 3x faster | | Throughput (slides/hour) | 12 | 35 | 2.9x improvement | | Cost per slide | $0.85 | $0.32 | 62% savings | | OOM errors | 89/day | 2/day | 98% reduction |
Concurrency optimization:
- Previous: 50 concurrent tasks, 45% failure rate
- Optimized: 10 concurrent tasks, 2% failure rate
- Sweet spot: 8-12 tasks depending on slide size
Lessons Learned
- More != Faster: Excessive concurrency causes resource contention
- Measure Capacity: Know your cluster limits before setting concurrency
- Leave Headroom: Use 80% of capacity, not 100%
- Monitor Task Count: Alert when approaching limits
- Right-Size Tasks: Match CPU/memory to actual usage patterns