← Back

ECS Concurrency and Resource Management: Preventing Task Thrashing

·wsi-processor

ECS Concurrency and Resource Management: Preventing Task Thrashing

Key Takeaway

Our ECS cluster launched too many WSI processing tasks concurrently, exhausting memory and causing cascading failures as tasks competed for limited resources. Implementing concurrency limits and resource reservations reduced task failure rate from 45% to 2% and improved throughput by 3x.

The Problem

We consumed messages from SQS without limiting concurrent ECS tasks:

# Lambda that triggers ECS tasks
def lambda_handler(event, context):
    for record in event['Records']:
        # Launch ECS task for each message
        # No concurrency control!
        ecs.run_task(
            cluster='wsi-processor',
            taskDefinition='wsi-processor',
            launchType='FARGATE'
        )
        # Cluster gets overwhelmed!

Issues:

  1. Resource Exhaustion: 50+ tasks launched simultaneously
  2. Memory Thrashing: Tasks killed for OOM
  3. CPU Starvation: Tasks took 5x longer due to contention
  4. Cascading Failures: Failed tasks requeued, making problem worse
  5. High Costs: Running many slow tasks more expensive than few fast tasks

The Solution

Implemented concurrency limits and intelligent task management:

import boto3
from typing import Optional

class ECSTaskManager:
    """Manage ECS task concurrency and resources"""

    def __init__(
        self,
        cluster_name: str,
        task_definition: str,
        max_concurrent_tasks: int = 10
    ):
        self.cluster_name = cluster_name
        self.task_definition = task_definition
        self.max_concurrent_tasks = max_concurrent_tasks
        self.ecs = boto3.client('ecs')

    def get_running_task_count(self) -> int:
        """Get count of currently running tasks"""

        response = self.ecs.list_tasks(
            cluster=self.cluster_name,
            desiredStatus='RUNNING',
            family=self.task_definition.split(':')[0]  # Remove version
        )

        return len(response.get('taskArns', []))

    def can_launch_task(self) -> bool:
        """Check if we can launch another task"""

        current_count = self.get_running_task_count()

        logger.info(
            f"Current tasks: {current_count}/{self.max_concurrent_tasks}"
        )

        return current_count < self.max_concurrent_tasks

    def launch_task(self, message: dict) -> Optional[str]:
        """Launch ECS task if within concurrency limit"""

        if not self.can_launch_task():
            logger.warning(
                f"At max concurrency ({self.max_concurrent_tasks}), "
                f"skipping task launch"
            )
            return None

        # Get message details
        body = json.loads(message['Body'])

        # Launch task with resource constraints
        response = self.ecs.run_task(
            cluster=self.cluster_name,
            taskDefinition=self.task_definition,
            launchType='FARGATE',
            platformVersion='1.4.0',

            # Resource requirements
            overrides={
                'containerOverrides': [{
                    'name': 'wsi-processor',
                    'environment': [
                        {'name': 'S3_BUCKET', 'value': body['bucket']},
                        {'name': 'S3_KEY', 'value': body['key']},
                        {'name': 'SQS_RECEIPT_HANDLE', 'value': message['ReceiptHandle']}
                    ],
                    # Set CPU and memory
                    'cpu': 2048,  # 2 vCPU
                    'memory': 4096  # 4 GB
                }]
            },

            # Network configuration
            networkConfiguration={
                'awsvpcConfiguration': {
                    'subnets': [os.getenv('SUBNET_ID')],
                    'securityGroups': [os.getenv('SECURITY_GROUP_ID')],
                    'assignPublicIp': 'ENABLED'
                }
            },

            # Tags for tracking
            tags=[
                {'key': 'Source', 'value': 'SQS'},
                {'key': 'S3Key', 'value': body['key']},
                {'key': 'MessageId', 'value': message['MessageId']}
            ]
        )

        task_arn = response['tasks'][0]['taskArn']
        logger.info(f"Launched task: {task_arn}")

        return task_arn

def lambda_handler(event, context):
    """Process SQS messages with concurrency control"""

    task_manager = ECSTaskManager(
        cluster_name='wsi-processor-cluster',
        task_definition='wsi-processor:latest',
        max_concurrent_tasks=10  # Limit concurrent tasks
    )

    launched = 0
    skipped = 0

    for record in event['Records']:
        task_arn = task_manager.launch_task(record)

        if task_arn:
            launched += 1
        else:
            skipped += 1

    logger.info(f"Launched: {launched}, Skipped: {skipped}")

    # Return skipped count to CloudWatch
    return {
        'launched': launched,
        'skipped': skipped,
        'running_tasks': task_manager.get_running_task_count()
    }

Implementation Details

Dynamic Concurrency Based on Cluster Capacity

def calculate_optimal_concurrency(self) -> int:
    """Calculate optimal concurrency based on cluster resources"""

    # Get cluster capacity
    response = self.ecs.describe_clusters(clusters=[self.cluster_name])
    cluster = response['clusters'][0]

    # Get registered container instances
    registered_cpu = cluster.get('registeredContainerInstancesCount', 0) * 2048
    registered_memory = cluster.get('registeredContainerInstancesCount', 0) * 4096

    # Calculate how many tasks can fit
    tasks_by_cpu = registered_cpu // 2048  # Each task needs 2 vCPU
    tasks_by_memory = registered_memory // 4096  # Each task needs 4GB

    optimal = min(tasks_by_cpu, tasks_by_memory)

    # Leave 20% headroom
    return int(optimal * 0.8)

SQS Integration with Throttling

# Configure SQS to throttle based on ECS capacity
sqs = boto3.client('sqs')

def configure_sqs_throttling(queue_url: str, max_tasks: int):
    """Configure SQS to respect ECS concurrency limits"""

    # Set visibility timeout based on expected processing time
    sqs.set_queue_attributes(
        QueueUrl=queue_url,
        Attributes={
            # 15 minutes visibility
            'VisibilityTimeout': '900',

            # Receive up to max_tasks messages
            'ReceiveMessageWaitTimeSeconds': '20',  # Long polling

            # Maximum messages per receive
            'MaximumMessageSize': '262144'  # 256 KB
        }
    )

# Lambda reserved concurrency
# Limits how many Lambda instances can run
# This indirectly limits ECS task launches
lambda_client = boto3.client('lambda')

lambda_client.put_function_concurrency(
    FunctionName='wsi-processor-launcher',
    ReservedConcurrentExecutions=5  # Max 5 Lambda instances
)

CloudWatch Alarms for Capacity

cloudwatch = boto3.client('cloudwatch')

def create_capacity_alarms():
    """Create alarms for ECS capacity issues"""

    # Alarm when running tasks exceed threshold
    cloudwatch.put_metric_alarm(
        AlarmName='WSI-Processor-High-Task-Count',
        ComparisonOperator='GreaterThanThreshold',
        EvaluationPeriods=2,
        MetricName='RunningTaskCount',
        Namespace='AWS/ECS',
        Period=60,
        Statistic='Average',
        Threshold=15,
        ActionsEnabled=True,
        AlarmActions=[os.getenv('SNS_TOPIC_ARN')],
        Dimensions=[
            {'Name': 'ClusterName', 'Value': 'wsi-processor-cluster'}
        ]
    )

    # Alarm for task failure rate
    cloudwatch.put_metric_alarm(
        AlarmName='WSI-Processor-High-Failure-Rate',
        ComparisonOperator='GreaterThanThreshold',
        EvaluationPeriods=2,
        MetricName='TaskFailureRate',
        Namespace='WSI/Processing',
        Period=300,
        Statistic='Average',
        Threshold=0.1,  # 10% failure rate
        ActionsEnabled=True,
        AlarmActions=[os.getenv('SNS_TOPIC_ARN')]
    )

Impact and Results

| Metric | Before | After | Improvement | |--------|--------|-------|-------------| | Task failure rate | 45% | 2% | 96% reduction | | Average processing time | 18 min | 6 min | 3x faster | | Throughput (slides/hour) | 12 | 35 | 2.9x improvement | | Cost per slide | $0.85 | $0.32 | 62% savings | | OOM errors | 89/day | 2/day | 98% reduction |

Concurrency optimization:

  • Previous: 50 concurrent tasks, 45% failure rate
  • Optimized: 10 concurrent tasks, 2% failure rate
  • Sweet spot: 8-12 tasks depending on slide size

Lessons Learned

  1. More != Faster: Excessive concurrency causes resource contention
  2. Measure Capacity: Know your cluster limits before setting concurrency
  3. Leave Headroom: Use 80% of capacity, not 100%
  4. Monitor Task Count: Alert when approaching limits
  5. Right-Size Tasks: Match CPU/memory to actual usage patterns