← Back

Receipt Handle Validation: Preventing SQS Message Deletion Failures

·wsi-processor

Receipt Handle Validation: Preventing SQS Message Deletion Failures

Key Takeaway

Our WSI processor crashed when trying to delete SQS messages with invalid or expired receipt handles, causing messages to be reprocessed repeatedly. Adding receipt handle validation and expiration checking reduced duplicate processing from 23% to 0.2%.

The Problem

We stored receipt handles without validation or expiration tracking:

def process_message(message):
    receipt_handle = message['ReceiptHandle']

    # Process WSI (takes 5-10 minutes)
    process_wsi(message['Body'])

    # Try to delete - receipt handle may be expired!
    sqs.delete_message(
        QueueUrl=queue_url,
        ReceiptHandle=receipt_handle  # May be invalid
    )  # Fails silently, message reprocessed!

Issues:

  1. Expired Handles: Receipt handles expire after visibility timeout
  2. Silent Failures: Invalid deletes don't raise errors
  3. Duplicate Processing: Same message processed multiple times
  4. Wasted Resources: Reprocessing costs money and time
  5. Idempotency Required: Had to make everything idempotent due to duplicates

The Solution

import time
from typing import Optional
import boto3

class SQSMessageHandler:
    """Handle SQS messages with receipt validation"""

    def __init__(self, queue_url: str, visibility_timeout: int = 300):
        self.queue_url = queue_url
        self.visibility_timeout = visibility_timeout
        self.sqs = boto3.client('sqs')

    def process_message_safely(self, message: dict):
        """Process message with receipt handle management"""

        receipt_handle = message['ReceiptHandle']
        message_id = message['MessageId']
        start_time = time.time()

        logger.info(f"Processing message {message_id}")

        try:
            # Process message
            body = json.loads(message['Body'])
            result = self.process_wsi(body)

            # Check if receipt handle is still valid
            elapsed = time.time() - start_time

            if elapsed > self.visibility_timeout * 0.9:  # 90% of timeout
                logger.warning(
                    f"Processing took {elapsed}s, "
                    f"extending visibility timeout"
                )

                # Extend visibility timeout
                new_receipt = self.extend_visibility(receipt_handle, 300)
                receipt_handle = new_receipt

            # Delete message
            self.delete_message_safe(receipt_handle, message_id)

            return result

        except Exception as e:
            logger.error(f"Failed to process message {message_id}: {e}")

            # Return message to queue by not deleting
            # It will become visible again after timeout
            raise

    def extend_visibility(
        self,
        receipt_handle: str,
        additional_seconds: int
    ) -> str:
        """Extend visibility timeout and get new receipt handle"""

        try:
            response = self.sqs.change_message_visibility(
                QueueUrl=self.queue_url,
                ReceiptHandle=receipt_handle,
                VisibilityTimeout=additional_seconds
            )

            logger.info(f"Extended visibility by {additional_seconds}s")

            # Receipt handle doesn't change in boto3
            return receipt_handle

        except self.sqs.exceptions.ReceiptHandleIsInvalid:
            logger.error("Receipt handle already expired")
            raise

    def delete_message_safe(self, receipt_handle: str, message_id: str):
        """Delete message with validation"""

        try:
            self.sqs.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=receipt_handle
            )

            logger.info(f"Deleted message {message_id}")

        except self.sqs.exceptions.ReceiptHandleIsInvalid:
            logger.error(
                f"Receipt handle invalid for message {message_id}, "
                f"message will be reprocessed"
            )

            # Track duplicate processing
            self.record_duplicate(message_id)

        except Exception as e:
            logger.error(f"Failed to delete message {message_id}: {e}")
            raise

def lambda_handler(event, context):
    """Process SQS messages with proper receipt handle management"""

    queue_url = os.getenv('QUEUE_URL')
    handler = SQSMessageHandler(queue_url, visibility_timeout=300)

    for record in event['Records']:
        try:
            handler.process_message_safely(record)

        except Exception as e:
            logger.exception(f"Failed to process record: {e}")
            # Let Lambda retry
            raise

Implementation Details

Heartbeat Pattern for Long Processing

import threading

class MessageHeartbeat:
    """Extend visibility timeout periodically during long processing"""

    def __init__(self, sqs, queue_url, receipt_handle, interval=60):
        self.sqs = sqs
        self.queue_url = queue_url
        self.receipt_handle = receipt_handle
        self.interval = interval
        self.stop_event = threading.Event()
        self.thread = None

    def start(self):
        """Start heartbeat thread"""
        self.thread = threading.Thread(target=self._heartbeat_loop)
        self.thread.daemon = True
        self.thread.start()

    def stop(self):
        """Stop heartbeat thread"""
        self.stop_event.set()
        if self.thread:
            self.thread.join()

    def _heartbeat_loop(self):
        """Periodically extend visibility timeout"""
        while not self.stop_event.wait(self.interval):
            try:
                self.sqs.change_message_visibility(
                    QueueUrl=self.queue_url,
                    ReceiptHandle=self.receipt_handle,
                    VisibilityTimeout=120  # Extend by 2 minutes
                )
                logger.debug("Extended message visibility")
            except Exception as e:
                logger.error(f"Failed to extend visibility: {e}")

# Usage
def process_long_running_task(message):
    """Process task with heartbeat"""

    heartbeat = MessageHeartbeat(
        sqs_client,
        queue_url,
        message['ReceiptHandle'],
        interval=60
    )

    heartbeat.start()

    try:
        # Long processing
        result = process_wsi(message['Body'])
        return result
    finally:
        heartbeat.stop()

Impact and Results

| Metric | Before | After | |--------|--------|-------| | Duplicate processing rate | 23% | 0.2% | | Failed message deletions | 67/day | 2/day | | Wasted processing cost | $145/month | $6/month |

Lessons Learned

  1. Validate Receipt Handles: Check expiration before using
  2. Extend for Long Tasks: Use visibility timeout extension for processing >5min
  3. Heartbeat Pattern: Periodically extend timeout during long operations
  4. Monitor Duplicates: Track and alert on duplicate processing
  5. Design for Idempotency: Even with validation, duplicates can occur