Source code for core_aws.services.lambdas.events.sqs_queue

# -*- coding: utf-8 -*-

"""
AWS Lambda SQS queue event record.

This module provides a wrapper for SQS queue records received by AWS Lambda
functions, including access to message body, attributes, and receipt handles.
"""

from typing import Any
from typing import Dict
from typing import Optional

from .base import EventRecord
from .base import EventSource


[docs] class SqsRecord(EventRecord): """ Represents a record from AWS SQS queue in a Lambda event. Provides access to SQS message body, attributes, receipt handle, and metadata from messages pulled from SQS queues. The receipt handle can be used to delete the message after processing. Example: .. code-block:: python # Lambda handler receiving SQS event def lambda_handler(event, context): for raw_record in event['Records']: record = EventRecord.from_dict(raw_record) if isinstance(record, SqsRecord): # Access message content print(f"Message: {record.message}") print(f"Message ID: {record.message_id}") # Access SQS metadata print(f"Queue ARN: {record.queue_arn}") print(f"Receipt Handle: {record.receipt_handle}") # Access message attributes if record.message_attributes: for key, value in record.message_attributes.items(): print(f"Attribute {key}: {value}") # Access system attributes sent_timestamp = record.attributes.get("SentTimestamp") print(f"Sent at: {sent_timestamp}") # Process JSON from SQS import json record = SqsRecord.from_dict(sqs_event_record) data = json.loads(record.message) process_message(data) .. """ _source = EventSource.SQS_QUEUE # noinspection PyPep8Naming
[docs] def __init__( self, eventSource: str, eventSourceARN: str, awsRegion: str, messageId: str, receiptHandle: str, body: str, md5OfBody: str, attributes: Dict[str, Any], messageAttributes: Dict[str, Any], md5OfMessageAttributes: Optional[str] = None, **kwargs: Any ) -> None: """ Initialize an SQS queue event record. :param eventSource: Source identifier (always "aws:sqs" for SQS). :param eventSourceARN: ARN of the SQS queue. :param awsRegion: AWS region where the queue exists. :param messageId: Unique message identifier. :param receiptHandle: Receipt handle for deleting the message. :param body: Message body content (often JSON-encoded). :param md5OfBody: MD5 hash of the message body. :param attributes: System attributes containing message metadata. Common attributes: .. code-block:: python { "ApproximateReceiveCount": "1", "SentTimestamp": "1702071427000", "SenderId": "AIDAIT2UOQQY3AUEKVGXU", "ApproximateFirstReceiveTimestamp": "1702071427000" } .. :param md5OfMessageAttributes: MD5 hash of message attributes. :param messageAttributes: Custom message attributes. Structure: .. code-block:: python { "attribute_name": { "stringValue": "value", "dataType": "String" } } .. :param kwargs: Additional fields (ignored, for forward compatibility). """ self._event_source = eventSource self._event_source_arn = eventSourceARN self._aws_region = awsRegion self._message_id = messageId self._receipt_handle = receiptHandle self._body = body self._attributes = attributes self._message_attributes = messageAttributes self._md5_of_body = md5OfBody self._md5_of_message_attributes = md5OfMessageAttributes
@property def message_id(self) -> str: """ Get the unique message ID for this SQS message. :return: SQS message ID (UUID format). Example: "19dd0b57-b21e-4ac1-bd88-01bbb068cb78" """ return self._message_id @property def message(self) -> str: """ Get the message body from the SQS message. :return: Message body. Often JSON-encoded string that needs further parsing. Example: .. code-block:: python record = SqsRecord(...) message = record.message # '{"order_id": 123, "status": "pending"}' # Parse JSON if needed import json data = json.loads(record.message) print(data['order_id']) # 123 .. """ return self._body @property def receipt_handle(self) -> str: """ Get the receipt handle for this SQS message. The receipt handle is required to delete the message from the queue after processing. Each time a message is received, a new receipt handle is provided. :return: Receipt handle string (opaque token). Example: "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a..." Example: .. code-block:: python from boto3 import client sqs = client('sqs') record = SqsRecord(...) # Delete message after processing sqs.delete_message( QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789012/my-queue', ReceiptHandle=record.receipt_handle ) .. """ return self._receipt_handle @property def queue_arn(self) -> str: """ Get the ARN of the SQS queue that this message came from. :return: SQS queue ARN. Example: "arn:aws:sqs:us-east-1:123456789012:my-queue" """ return self._event_source_arn @property def attributes(self) -> Dict[str, Any]: """ Get the system attributes for this SQS message. System attributes contain metadata about the message such as timestamps, sender information, and receive count. :return: Dictionary of system attributes: .. code-block:: python { "ApproximateReceiveCount": "1", "SentTimestamp": "1702071427000", "SenderId": "AIDAIT2UOQQY3AUEKVGXU", "ApproximateFirstReceiveTimestamp": "1702071427000" } .. Example: .. code-block:: python record = SqsRecord(...) # Check how many times message was received receive_count = int(record.attributes.get("ApproximateReceiveCount", "0")) if receive_count > 3: print("Message has been received multiple times") # Get sent timestamp sent_ts = record.attributes.get("SentTimestamp") print(f"Sent at: {sent_ts}") .. """ return self._attributes @property def message_attributes(self) -> Dict[str, Any]: """ Get the custom message attributes for this SQS message. Message attributes are custom metadata set by the message producer. :return: Dictionary of message attributes with structure: .. code-block:: python { "attribute_name": { "stringValue": "value", "dataType": "String" } } .. Returns empty dict if no attributes present. Example: .. code-block:: python record = SqsRecord(...) attrs = record.message_attributes # Check for priority attribute if "priority" in attrs: priority = attrs["priority"]["stringValue"] print(f"Priority: {priority}") # Extract all string attributes for name, attr in attrs.items(): if attr["dataType"] == "String": print(f"{name}: {attr['stringValue']}") .. """ return self._message_attributes @property def md5_of_body(self) -> str: """ Get the MD5 hash of the message body. Can be used to verify message integrity. :return: MD5 hash string. Example: "098f6bcd4621d373cade4e832627b4f6" """ return self._md5_of_body