Receipt Handle Validation: Preventing SQS Message Deletion Failures
Key Takeaway
Our WSI processor crashed when trying to delete SQS messages with invalid or expired receipt handles, causing messages to be reprocessed repeatedly. Adding receipt handle validation and expiration checking reduced duplicate processing from 23% to 0.2%.
The Problem
We stored receipt handles without validation or expiration tracking:
def process_message(message):
receipt_handle = message['ReceiptHandle']
# Process WSI (takes 5-10 minutes)
process_wsi(message['Body'])
# Try to delete - receipt handle may be expired!
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle # May be invalid
) # Fails silently, message reprocessed!
Issues:
- Expired Handles: Receipt handles expire after visibility timeout
- Silent Failures: Invalid deletes don't raise errors
- Duplicate Processing: Same message processed multiple times
- Wasted Resources: Reprocessing costs money and time
- Idempotency Required: Had to make everything idempotent due to duplicates
The Solution
import time
from typing import Optional
import boto3
class SQSMessageHandler:
"""Handle SQS messages with receipt validation"""
def __init__(self, queue_url: str, visibility_timeout: int = 300):
self.queue_url = queue_url
self.visibility_timeout = visibility_timeout
self.sqs = boto3.client('sqs')
def process_message_safely(self, message: dict):
"""Process message with receipt handle management"""
receipt_handle = message['ReceiptHandle']
message_id = message['MessageId']
start_time = time.time()
logger.info(f"Processing message {message_id}")
try:
# Process message
body = json.loads(message['Body'])
result = self.process_wsi(body)
# Check if receipt handle is still valid
elapsed = time.time() - start_time
if elapsed > self.visibility_timeout * 0.9: # 90% of timeout
logger.warning(
f"Processing took {elapsed}s, "
f"extending visibility timeout"
)
# Extend visibility timeout
new_receipt = self.extend_visibility(receipt_handle, 300)
receipt_handle = new_receipt
# Delete message
self.delete_message_safe(receipt_handle, message_id)
return result
except Exception as e:
logger.error(f"Failed to process message {message_id}: {e}")
# Return message to queue by not deleting
# It will become visible again after timeout
raise
def extend_visibility(
self,
receipt_handle: str,
additional_seconds: int
) -> str:
"""Extend visibility timeout and get new receipt handle"""
try:
response = self.sqs.change_message_visibility(
QueueUrl=self.queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=additional_seconds
)
logger.info(f"Extended visibility by {additional_seconds}s")
# Receipt handle doesn't change in boto3
return receipt_handle
except self.sqs.exceptions.ReceiptHandleIsInvalid:
logger.error("Receipt handle already expired")
raise
def delete_message_safe(self, receipt_handle: str, message_id: str):
"""Delete message with validation"""
try:
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=receipt_handle
)
logger.info(f"Deleted message {message_id}")
except self.sqs.exceptions.ReceiptHandleIsInvalid:
logger.error(
f"Receipt handle invalid for message {message_id}, "
f"message will be reprocessed"
)
# Track duplicate processing
self.record_duplicate(message_id)
except Exception as e:
logger.error(f"Failed to delete message {message_id}: {e}")
raise
def lambda_handler(event, context):
"""Process SQS messages with proper receipt handle management"""
queue_url = os.getenv('QUEUE_URL')
handler = SQSMessageHandler(queue_url, visibility_timeout=300)
for record in event['Records']:
try:
handler.process_message_safely(record)
except Exception as e:
logger.exception(f"Failed to process record: {e}")
# Let Lambda retry
raise
Implementation Details
Heartbeat Pattern for Long Processing
import threading
class MessageHeartbeat:
"""Extend visibility timeout periodically during long processing"""
def __init__(self, sqs, queue_url, receipt_handle, interval=60):
self.sqs = sqs
self.queue_url = queue_url
self.receipt_handle = receipt_handle
self.interval = interval
self.stop_event = threading.Event()
self.thread = None
def start(self):
"""Start heartbeat thread"""
self.thread = threading.Thread(target=self._heartbeat_loop)
self.thread.daemon = True
self.thread.start()
def stop(self):
"""Stop heartbeat thread"""
self.stop_event.set()
if self.thread:
self.thread.join()
def _heartbeat_loop(self):
"""Periodically extend visibility timeout"""
while not self.stop_event.wait(self.interval):
try:
self.sqs.change_message_visibility(
QueueUrl=self.queue_url,
ReceiptHandle=self.receipt_handle,
VisibilityTimeout=120 # Extend by 2 minutes
)
logger.debug("Extended message visibility")
except Exception as e:
logger.error(f"Failed to extend visibility: {e}")
# Usage
def process_long_running_task(message):
"""Process task with heartbeat"""
heartbeat = MessageHeartbeat(
sqs_client,
queue_url,
message['ReceiptHandle'],
interval=60
)
heartbeat.start()
try:
# Long processing
result = process_wsi(message['Body'])
return result
finally:
heartbeat.stop()
Impact and Results
| Metric | Before | After | |--------|--------|-------| | Duplicate processing rate | 23% | 0.2% | | Failed message deletions | 67/day | 2/day | | Wasted processing cost | $145/month | $6/month |
Lessons Learned
- Validate Receipt Handles: Check expiration before using
- Extend for Long Tasks: Use visibility timeout extension for processing >5min
- Heartbeat Pattern: Periodically extend timeout during long operations
- Monitor Duplicates: Track and alert on duplicate processing
- Design for Idempotency: Even with validation, duplicates can occur