Source code for core_aws.services.lambdas.decorators

# -*- coding: utf-8 -*-

"""
AWS Lambda SQS message processing decorators.

This module provides decorators for processing SQS messages in Lambda functions
with automatic error handling and batch failure reporting using AWS Lambda's
partial batch response feature.

Type Aliases:
  - MessageHandler: Function that processes individual SQS messages/records.
  - LambdaHandler: Standard AWS Lambda handler function signature.
  - BatchResponse: Lambda response format for batch item failures.
  - DecoratedHandler: Lambda handler that returns batch failure information.
"""

from __future__ import annotations

from functools import wraps
from logging import Logger
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

from core_aws.services.lambdas.events.base import EventRecord
from core_aws.typing_ import LambdaContext

# Type aliases for cleaner decorator signatures...
BatchResponse = Dict[str, List[Dict[str, str]]]
DecoratedHandler = Callable[[Dict[str, Any], LambdaContext], BatchResponse]
MessageHandler = Callable[[Union[EventRecord, Dict[str, Any]]], Any]
LambdaHandler = Callable[[Dict[str, Any], LambdaContext], Any]


[docs] def process_sqs_batch( message_handler: MessageHandler, logger: Logger, post_process_fcn: Optional[Callable[[], None]] = None, ) -> Callable[[LambdaHandler], DecoratedHandler]: """ Process SQS messages with partial batch failure reporting (RECOMMENDED). This decorator implements AWS Lambda's partial batch response feature for SQS, which is the **recommended and safer approach** compared to manual message deletion. Failed messages are automatically returned to the queue for retry, while successful messages are deleted by Lambda. **IMPORTANT**: You must enable "Report batch item failures" in your Lambda trigger configuration for this decorator to work properly. References: - https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting - https://repost.aws/knowledge-center/lambda-sqs-report-batch-item-failures :param message_handler: Function that processes each individual message (MessageHandler). Receives an EventRecord instance or Dict containing the message data. Should raise an exception if processing fails to mark the message for retry. :param logger: Logger instance for tracking message processing and errors. :param post_process_fcn: Optional cleanup function called after all messages are processed, regardless of success or failure (e.g., closing database connections, releasing resources). Receives no arguments and returns None. :return: Decorator function that transforms a LambdaHandler into a DecoratedHandler that returns batch failure information. Example: .. code-block:: python import logging from core_aws.services.lambdas.decorators import process_sqs_batch from core_aws.services.lambdas.events import SqsRecord logger = logging.getLogger(__name__) def process_message(record): \"\"\"Process individual SQS message.\"\"\" # Access message content message_data = json.loads(record.message) # Process the data result = do_something(message_data) # Raise exception on failure to mark message as failed if not result: raise ValueError("Processing failed") def cleanup(): \"\"\"Optional cleanup after batch processing.\"\"\" db_connection.close() @process_sqs_batch( message_handler=process_message, logger=logger, post_process_fcn=cleanup ) def lambda_handler(event, context): \"\"\" Lambda handler - executed before processing messages. Body is optional - use for initialization if needed. \"\"\" logger.info("Lambda invoked") # Optional: initialization code here # Lambda returns: # { # "batchItemFailures": [ # {"itemIdentifier": "failed-message-id-1"}, # {"itemIdentifier": "failed-message-id-2"} # ] # } .. Benefits: 1. **No manual deletion**: Lambda handles message deletion automatically 2. **Safer**: Failed messages stay in queue for retry 3. **Simpler**: No need to manage SQS client or queue URLs 4. **AWS recommended**: Official pattern for SQS + Lambda 5. **Better visibility**: CloudWatch metrics show batch failures Configuration: Enable "Report batch item failures" in Lambda trigger: 1. Go to Lambda console > Your function > Configuration > Triggers 2. Edit SQS trigger 3. Expand "Additional settings" 4. Enable "Report batch item failures" 5. Save changes """ def decorator(handler: LambdaHandler) -> DecoratedHandler: @wraps(handler) def execute(event: Dict[str, Any], context: LambdaContext) -> BatchResponse: logger.info("Starting the execution of handler function...") handler(event, context) logger.info("Processing the incoming event and the records within it...") batch_item_failures: List[Dict[str, str]] = [] for raw_record in event.get("Records", []): record = EventRecord.from_dict(raw_record) if isinstance(record, EventRecord): message_id = record.message_id else: # Fallback for unknown event sources... message_id = raw_record.get("messageId", "unknown") try: logger.info(f"Processing message [{message_id}]...") message_handler(record) logger.info(f"Message [{message_id}] processed successfully!") except Exception as error: # Add to failures, Lambda will not delete these messages... batch_item_failures.append({"itemIdentifier": message_id}) logger.error({ "MessageId": message_id, "error_type": type(error).__name__, "error": str(error), }) if post_process_fcn: logger.info("Processing post process function...") post_process_fcn() logger.info( f"Execution Done! Successful: {len(event.get('Records', [])) - len(batch_item_failures)}, " f"Failed: {len(batch_item_failures)}." ) return {"batchItemFailures": batch_item_failures} return execute return decorator