Events#
Base class for events#
AWS Lambda event record base classes and utilities.
This module provides base classes for handling AWS Lambda event records from various sources (Kinesis, SQS, SNS). It implements a factory pattern that automatically instantiates the correct record type based on the event source.
- class core_aws.services.lambdas.events.base.EventSource(*values)[source]#
-
AWS event source identifiers for Lambda function triggers.
These values correspond to the “eventSource” or “EventSource” field in Lambda event records.
- KINESIS_DATA_STREAM = 'aws:kinesis'#
- SNS_TOPIC = 'aws:sns'#
- SQS_QUEUE = 'aws:sqs'#
- class core_aws.services.lambdas.events.base.EventRecord[source]#
Bases:
ABCAbstract base class for AWS Lambda event records.
This class provides a factory pattern for creating specific record types (SqsRecord, SnsRecord, KinesisRecord) based on the event source. Subclasses are automatically registered when defined with a _source attribute.
The factory pattern allows automatic deserialization of Lambda event records:
Example
# Lambda handler receiving SQS event def lambda_handler(event, context): for raw_record in event['Records']: # Factory automatically creates SqsRecord instance record = EventRecord.from_dict(raw_record) # Access common interface print(f"Message ID: {record.message_id}") print(f"Body: {record.message}") # Type-specific access if isinstance(record, SqsRecord): print(f"Receipt Handle: {record._receipt_handle}") # Or handle unknown event sources gracefully record = EventRecord.from_dict(raw_record) if isinstance(record, dict): # Unknown source - got raw dict back print(f"Unknown event source: {record.get('eventSource')}") else: # Known source - got EventRecord subclass process_message(record.message)
- Subclass Implementation:
To create a new event record type:
class CustomRecord(EventRecord): _source = EventSource.SQS_QUEUE # Register with factory def __init__(self, messageId: str, body: str, **kwargs): self._message_id = messageId self._body = body @property def message_id(self) -> str: return self._message_id @property def message(self) -> str: return self._body
- _subclasses: Dict[str, Callable[[...], EventRecord]] = {EventSource.KINESIS_DATA_STREAM: <class 'core_aws.services.lambdas.events.kinesis_stream.KinesisRecord'>, EventSource.SNS_TOPIC: <class 'core_aws.services.lambdas.events.sns_topic.SnsRecord'>, EventSource.SQS_QUEUE: <class 'core_aws.services.lambdas.events.sqs_queue.SqsRecord'>}#
- _source: EventSource#
- abstract property message_id: str#
Get the unique identifier for this message/record. :return: Message ID string (format varies by event source).
Example
SQS: “19dd0b57-b21e-4ac1-bd88-01bbb068cb78”
SNS: “95df01b4-ee98-5cb9-9903-4c221d41eb5e”
Kinesis: Sequence number
- abstract property message: str#
Get the message payload/body. :return: Message content as string (may be JSON-encoded).
Example
SQS: Body field content
SNS: Message field content
Kinesis: Base64-decoded data
- classmethod from_dict(message: Dict[str, Any]) EventRecord | Dict[str, Any][source]#
Factory method to create appropriate EventRecord subclass from raw event data. Inspects the “eventSource” or “EventSource” field to determine the correct record type, then instantiates and returns it. If the event source is not recognized, returns the raw dictionary unchanged.
- Parameters:
message – Raw event record dictionary from Lambda event. Must contain “eventSource” or “EventSource” field.
- Returns:
EventRecord subclass instance if source is recognized
Raw dict if event source is unknown/unsupported
Example
# SQS event record sqs_record = { "eventSource": "aws:sqs", "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78", "body": '{"order_id": 123}', "receiptHandle": "AQEBwJ...", # ... other fields } record = EventRecord.from_dict(sqs_record) # Returns: SqsRecord instance print(record.message_id) # "19dd0b57-b21e-4ac1-bd88-01bbb068cb78" print(record.message) # '{"order_id": 123}' # Unknown event source unknown_record = { "eventSource": "aws:unknown", "data": "something" } record = EventRecord.from_dict(unknown_record) # Returns: Dict (unchanged) print(record) # {"eventSource": "aws:unknown", "data": "something"} # Batch processing in Lambda def lambda_handler(event, context): for raw_record in event['Records']: record = EventRecord.from_dict(raw_record) if isinstance(record, dict): # Unrecognized source logger.warning(f"Unknown source: {record.get('eventSource')}") continue # Process recognized record types process_message(record.message_id, record.message)
- _abc_impl = <_abc._abc_data object>#
Kinesis Event#
AWS Lambda Kinesis Data Stream event record.
This module provides a wrapper for Kinesis Data Stream records received by AWS Lambda functions, handling automatic base64 decoding of the data payload.
- class core_aws.services.lambdas.events.kinesis_stream.KinesisRecord(kinesis: Dict[str, Any], eventSource: str, eventVersion: str, eventID: str, eventName: str, invokeIdentityArn: str, awsRegion: str, eventSourceARN: str, **kwargs: Any)[source]#
Bases:
EventRecordRepresents a record from AWS Kinesis Data Stream in a Lambda event. Automatically decodes base64-encoded Kinesis data and provides access to record metadata including sequence number, partition key, and event details.
Example
# Lambda handler receiving Kinesis stream event def lambda_handler(event, context): for raw_record in event['Records']: record = EventRecord.from_dict(raw_record) if isinstance(record, KinesisRecord): # Access decoded message print(f"Message: {record.message}") # Access Kinesis metadata print(f"Sequence: {record.sequence_number}") print(f"Partition Key: {record.partition_key}") print(f"Shard ID: {record._kinesis['kinesisSchemaVersion']}") # Process JSON from Kinesis import json record = KinesisRecord.from_dict(kinesis_event_record) data = json.loads(record.message) process_event(data)
- _source: EventSource = 'aws:kinesis'#
- __init__(kinesis: Dict[str, Any], eventSource: str, eventVersion: str, eventID: str, eventName: str, invokeIdentityArn: str, awsRegion: str, eventSourceARN: str, **kwargs: Any) None[source]#
Initialize a Kinesis Data Stream event record.
- Parameters:
kinesis –
Kinesis-specific data containing the record payload and metadata. Structure:
{ "kinesisSchemaVersion": "1.0", "partitionKey": "user-123", "sequenceNumber": "49647175778160097793486557372840800878012000746547970050", "data": "eyJldmVudCI6ICJsb2dpbiJ9", # base64-encoded "approximateArrivalTimestamp": 1702071427.66 }
eventSource – Source identifier (always “aws:kinesis” for Kinesis).
eventVersion – Event format version.
eventID – Unique event identifier (shard ID:sequence number).
eventName – Event name (typically “aws:kinesis:record”).
invokeIdentityArn – ARN of the identity invoking the Lambda.
awsRegion – AWS region where the stream exists.
eventSourceARN – ARN of the Kinesis stream.
kwargs – Additional fields (ignored, for forward compatibility).
- property message_id: str#
Get the unique event ID for this Kinesis record.
- Returns:
Event ID in format: “shardId-{timestamp}-{sequenceNumber}”. Example: “shardId-000000000001:49590338271490256608559692538361571095921575989136588898”
- property message: str#
Get the decoded message data from the Kinesis record. Automatically decodes the base64-encoded data field and returns it as a UTF-8 string.
- Returns:
Decoded message content. Often JSON-encoded string that needs further parsing.
Example
record = KinesisRecord(...) message = record.message # '{"event": "login", "user_id": 123}' # Parse JSON if needed import json data = json.loads(record.message) print(data['event']) # "login"
- property sequence_number: str#
Get the sequence number for this Kinesis record.
- Returns:
Sequence number string. Used for ordering and checkpointing. Example: “49647175778160097793486557372840800878012000746547970050”
- property partition_key: str#
Get the partition key for this Kinesis record.
- Returns:
Partition key that determined which shard received this record. Example: “user-123”
- property approximate_arrival_timestamp: float#
Get the approximate arrival timestamp for this record.
- Returns:
Unix timestamp (seconds since epoch) when record arrived in the stream. Example: 1702071427.66
- _abc_impl = <_abc._abc_data object>#
SNS Event#
AWS Lambda SNS topic event record.
This module provides a wrapper for SNS topic records received by AWS Lambda functions, including access to message content, attributes, and metadata.
- class core_aws.services.lambdas.events.sns_topic.SnsRecord(EventSource: str, EventSubscriptionArn: str, EventVersion: str, Sns: Dict[str, Any], **kwargs: Any)[source]#
Bases:
EventRecordRepresents a record from AWS SNS topic in a Lambda event.
Provides access to SNS message content, attributes, subject, timestamp, and other metadata from notifications published to SNS topics.
Example
# Lambda handler receiving SNS event def lambda_handler(event, context): for raw_record in event['Records']: record = EventRecord.from_dict(raw_record) if isinstance(record, SnsRecord): # Access message content print(f"Message: {record.message}") print(f"Subject: {record.subject}") # Access SNS metadata print(f"Topic ARN: {record.topic_arn}") print(f"Timestamp: {record.timestamp}") # Access message attributes if record.message_attributes: for key, value in record.message_attributes.items(): print(f"Attribute {key}: {value}") # Process JSON from SNS import json record = SnsRecord.from_dict(sns_event_record) data = json.loads(record.message) process_notification(data)
- _source: EventSource = 'aws:sns'#
- __init__(EventSource: str, EventSubscriptionArn: str, EventVersion: str, Sns: Dict[str, Any], **kwargs: Any) None[source]#
Initialize an SNS topic event record.
- Parameters:
EventSource – Source identifier (always “aws:sns” for SNS).
EventSubscriptionArn – ARN of the SNS subscription that triggered this event.
EventVersion – Event format version.
Sns –
SNS-specific data containing the message and metadata. Structure:
{ "Type": "Notification", "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e", "TopicArn": "arn:aws:sns:us-east-1:123456789012:my-topic", "Subject": "Order Notification", "Message": '{"order_id": 123, "status": "shipped"}', "Timestamp": "2024-01-01T12:00:00.000Z", "SignatureVersion": "1", "Signature": "...", "SigningCertUrl": "https://...", "UnsubscribeUrl": "https://...", "MessageAttributes": { "priority": { "Type": "String", "Value": "high" } } }
kwargs – Additional fields (ignored, for forward compatibility).
- property message_id: str#
Get the unique message ID for this SNS notification.
- Returns:
SNS message ID (UUID format). Example: “95df01b4-ee98-5cb9-9903-4c221d41eb5e”
- property message: str#
Get the message content from the SNS notification.
- Returns:
Message body. Often JSON-encoded string that needs further parsing.
Example
record = SnsRecord(...) message = record.message # '{"order_id": 123, "status": "shipped"}' # Parse JSON if needed import json data = json.loads(record.message) print(data['order_id']) # 123
- property topic_arn: str#
Get the ARN of the SNS topic that published this message.
- Returns:
SNS topic ARN. Example: “arn:aws:sns:us-east-1:123456789012:my-topic”
- property subject: str | None#
Get the subject of the SNS notification (if provided).
- Returns:
Subject string or None if no subject was provided. Example: “Order Notification”
- property timestamp: str#
Get the timestamp when the message was published.
- Returns:
ISO 8601 timestamp string. Example: “2024-01-01T12:00:00.000Z”
- property message_attributes: Dict[str, Any]#
Get the message attributes (custom metadata) from the SNS notification.
- Returns:
Dictionary of message attributes with structure:
{ "attribute_name": { "Type": "String" | "Number" | "Binary", "Value": "attribute_value" } }
Returns empty dict if no attributes present.
Example
record = SnsRecord(...) attrs = record.message_attributes if "priority" in attrs: priority = attrs["priority"]["Value"] print(f"Priority: {priority}") # Extract all string attributes for name, attr in attrs.items(): if attr["Type"] == "String": print(f"{name}: {attr['Value']}")
- _abc_impl = <_abc._abc_data object>#
SQS Event#
AWS Lambda SQS queue event record.
This module provides a wrapper for SQS queue records received by AWS Lambda functions, including access to message body, attributes, and receipt handles.
- class core_aws.services.lambdas.events.sqs_queue.SqsRecord(eventSource: str, eventSourceARN: str, awsRegion: str, messageId: str, receiptHandle: str, body: str, md5OfBody: str, attributes: Dict[str, Any], messageAttributes: Dict[str, Any], md5OfMessageAttributes: str | None = None, **kwargs: Any)[source]#
Bases:
EventRecordRepresents a record from AWS SQS queue in a Lambda event.
Provides access to SQS message body, attributes, receipt handle, and metadata from messages pulled from SQS queues. The receipt handle can be used to delete the message after processing.
Example
# Lambda handler receiving SQS event def lambda_handler(event, context): for raw_record in event['Records']: record = EventRecord.from_dict(raw_record) if isinstance(record, SqsRecord): # Access message content print(f"Message: {record.message}") print(f"Message ID: {record.message_id}") # Access SQS metadata print(f"Queue ARN: {record.queue_arn}") print(f"Receipt Handle: {record.receipt_handle}") # Access message attributes if record.message_attributes: for key, value in record.message_attributes.items(): print(f"Attribute {key}: {value}") # Access system attributes sent_timestamp = record.attributes.get("SentTimestamp") print(f"Sent at: {sent_timestamp}") # Process JSON from SQS import json record = SqsRecord.from_dict(sqs_event_record) data = json.loads(record.message) process_message(data)
- _source: EventSource = 'aws:sqs'#
- __init__(eventSource: str, eventSourceARN: str, awsRegion: str, messageId: str, receiptHandle: str, body: str, md5OfBody: str, attributes: Dict[str, Any], messageAttributes: Dict[str, Any], md5OfMessageAttributes: str | None = None, **kwargs: Any) None[source]#
Initialize an SQS queue event record.
- Parameters:
eventSource – Source identifier (always “aws:sqs” for SQS).
eventSourceARN – ARN of the SQS queue.
awsRegion – AWS region where the queue exists.
messageId – Unique message identifier.
receiptHandle – Receipt handle for deleting the message.
body – Message body content (often JSON-encoded).
md5OfBody – MD5 hash of the message body.
attributes –
System attributes containing message metadata. Common attributes:
{ "ApproximateReceiveCount": "1", "SentTimestamp": "1702071427000", "SenderId": "AIDAIT2UOQQY3AUEKVGXU", "ApproximateFirstReceiveTimestamp": "1702071427000" }
md5OfMessageAttributes – MD5 hash of message attributes.
messageAttributes –
Custom message attributes. Structure:
{ "attribute_name": { "stringValue": "value", "dataType": "String" } }
kwargs – Additional fields (ignored, for forward compatibility).
- property message_id: str#
Get the unique message ID for this SQS message.
- Returns:
SQS message ID (UUID format). Example: “19dd0b57-b21e-4ac1-bd88-01bbb068cb78”
- property message: str#
Get the message body from the SQS message.
- Returns:
Message body. Often JSON-encoded string that needs further parsing.
Example
record = SqsRecord(...) message = record.message # '{"order_id": 123, "status": "pending"}' # Parse JSON if needed import json data = json.loads(record.message) print(data['order_id']) # 123
- property receipt_handle: str#
Get the receipt handle for this SQS message.
The receipt handle is required to delete the message from the queue after processing. Each time a message is received, a new receipt handle is provided.
- Returns:
Receipt handle string (opaque token). Example: “AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a…”
Example
from boto3 import client sqs = client('sqs') record = SqsRecord(...) # Delete message after processing sqs.delete_message( QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789012/my-queue', ReceiptHandle=record.receipt_handle )
- property queue_arn: str#
Get the ARN of the SQS queue that this message came from.
- Returns:
SQS queue ARN. Example: “arn:aws:sqs:us-east-1:123456789012:my-queue”
- property attributes: Dict[str, Any]#
Get the system attributes for this SQS message.
System attributes contain metadata about the message such as timestamps, sender information, and receive count.
- Returns:
Dictionary of system attributes:
{ "ApproximateReceiveCount": "1", "SentTimestamp": "1702071427000", "SenderId": "AIDAIT2UOQQY3AUEKVGXU", "ApproximateFirstReceiveTimestamp": "1702071427000" }
Example
record = SqsRecord(...) # Check how many times message was received receive_count = int(record.attributes.get("ApproximateReceiveCount", "0")) if receive_count > 3: print("Message has been received multiple times") # Get sent timestamp sent_ts = record.attributes.get("SentTimestamp") print(f"Sent at: {sent_ts}")
- property message_attributes: Dict[str, Any]#
Get the custom message attributes for this SQS message.
Message attributes are custom metadata set by the message producer.
- Returns:
Dictionary of message attributes with structure:
{ "attribute_name": { "stringValue": "value", "dataType": "String" } }
Returns empty dict if no attributes present.
Example
record = SqsRecord(...) attrs = record.message_attributes # Check for priority attribute if "priority" in attrs: priority = attrs["priority"]["stringValue"] print(f"Priority: {priority}") # Extract all string attributes for name, attr in attrs.items(): if attr["dataType"] == "String": print(f"{name}: {attr['stringValue']}")
- property md5_of_body: str#
Get the MD5 hash of the message body.
Can be used to verify message integrity.
- Returns:
MD5 hash string. Example: “098f6bcd4621d373cade4e832627b4f6”
- _abc_impl = <_abc._abc_data object>#