Events#

Base class for events#

AWS Lambda event record base classes and utilities.

This module provides base classes for handling AWS Lambda event records from various sources (Kinesis, SQS, SNS). It implements a factory pattern that automatically instantiates the correct record type based on the event source.

class core_aws.services.lambdas.events.base.EventSource(*values)[source]#

Bases: str, Enum

AWS event source identifiers for Lambda function triggers.

These values correspond to the “eventSource” or “EventSource” field in Lambda event records.

KINESIS_DATA_STREAM = 'aws:kinesis'#
SNS_TOPIC = 'aws:sns'#
SQS_QUEUE = 'aws:sqs'#
class core_aws.services.lambdas.events.base.EventRecord[source]#

Bases: ABC

Abstract base class for AWS Lambda event records.

This class provides a factory pattern for creating specific record types (SqsRecord, SnsRecord, KinesisRecord) based on the event source. Subclasses are automatically registered when defined with a _source attribute.

The factory pattern allows automatic deserialization of Lambda event records:

Example

# Lambda handler receiving SQS event
def lambda_handler(event, context):
    for raw_record in event['Records']:
        # Factory automatically creates SqsRecord instance
        record = EventRecord.from_dict(raw_record)

        # Access common interface
        print(f"Message ID: {record.message_id}")
        print(f"Body: {record.message}")

        # Type-specific access
        if isinstance(record, SqsRecord):
            print(f"Receipt Handle: {record._receipt_handle}")

# Or handle unknown event sources gracefully
record = EventRecord.from_dict(raw_record)
if isinstance(record, dict):
    # Unknown source - got raw dict back
    print(f"Unknown event source: {record.get('eventSource')}")
else:
    # Known source - got EventRecord subclass
    process_message(record.message)
Subclass Implementation:

To create a new event record type:

class CustomRecord(EventRecord):
    _source = EventSource.SQS_QUEUE  # Register with factory

    def __init__(self, messageId: str, body: str, **kwargs):
        self._message_id = messageId
        self._body = body

    @property
    def message_id(self) -> str:
        return self._message_id

    @property
    def message(self) -> str:
        return self._body
_subclasses: Dict[str, Callable[[...], EventRecord]] = {EventSource.KINESIS_DATA_STREAM: <class 'core_aws.services.lambdas.events.kinesis_stream.KinesisRecord'>, EventSource.SNS_TOPIC: <class 'core_aws.services.lambdas.events.sns_topic.SnsRecord'>, EventSource.SQS_QUEUE: <class 'core_aws.services.lambdas.events.sqs_queue.SqsRecord'>}#
_source: EventSource#
abstract property message_id: str#

Get the unique identifier for this message/record. :return: Message ID string (format varies by event source).

Example

  • SQS: “19dd0b57-b21e-4ac1-bd88-01bbb068cb78”

  • SNS: “95df01b4-ee98-5cb9-9903-4c221d41eb5e”

  • Kinesis: Sequence number

abstract property message: str#

Get the message payload/body. :return: Message content as string (may be JSON-encoded).

Example

  • SQS: Body field content

  • SNS: Message field content

  • Kinesis: Base64-decoded data

classmethod from_dict(message: Dict[str, Any]) EventRecord | Dict[str, Any][source]#

Factory method to create appropriate EventRecord subclass from raw event data. Inspects the “eventSource” or “EventSource” field to determine the correct record type, then instantiates and returns it. If the event source is not recognized, returns the raw dictionary unchanged.

Parameters:

message – Raw event record dictionary from Lambda event. Must contain “eventSource” or “EventSource” field.

Returns:

  • EventRecord subclass instance if source is recognized

  • Raw dict if event source is unknown/unsupported

Example

# SQS event record
sqs_record = {
    "eventSource": "aws:sqs",
    "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
    "body": '{"order_id": 123}',
    "receiptHandle": "AQEBwJ...",
    # ... other fields
}

record = EventRecord.from_dict(sqs_record)
# Returns: SqsRecord instance
print(record.message_id)  # "19dd0b57-b21e-4ac1-bd88-01bbb068cb78"
print(record.message)     # '{"order_id": 123}'

# Unknown event source
unknown_record = {
    "eventSource": "aws:unknown",
    "data": "something"
}

record = EventRecord.from_dict(unknown_record)
# Returns: Dict (unchanged)
print(record)  # {"eventSource": "aws:unknown", "data": "something"}

# Batch processing in Lambda
def lambda_handler(event, context):
    for raw_record in event['Records']:
        record = EventRecord.from_dict(raw_record)

        if isinstance(record, dict):
            # Unrecognized source
            logger.warning(f"Unknown source: {record.get('eventSource')}")
            continue

        # Process recognized record types
        process_message(record.message_id, record.message)
_abc_impl = <_abc._abc_data object>#

Kinesis Event#

AWS Lambda Kinesis Data Stream event record.

This module provides a wrapper for Kinesis Data Stream records received by AWS Lambda functions, handling automatic base64 decoding of the data payload.

class core_aws.services.lambdas.events.kinesis_stream.KinesisRecord(kinesis: Dict[str, Any], eventSource: str, eventVersion: str, eventID: str, eventName: str, invokeIdentityArn: str, awsRegion: str, eventSourceARN: str, **kwargs: Any)[source]#

Bases: EventRecord

Represents a record from AWS Kinesis Data Stream in a Lambda event. Automatically decodes base64-encoded Kinesis data and provides access to record metadata including sequence number, partition key, and event details.

Example

# Lambda handler receiving Kinesis stream event
def lambda_handler(event, context):
    for raw_record in event['Records']:
        record = EventRecord.from_dict(raw_record)

        if isinstance(record, KinesisRecord):
            # Access decoded message
            print(f"Message: {record.message}")

            # Access Kinesis metadata
            print(f"Sequence: {record.sequence_number}")
            print(f"Partition Key: {record.partition_key}")
            print(f"Shard ID: {record._kinesis['kinesisSchemaVersion']}")

# Process JSON from Kinesis
import json
record = KinesisRecord.from_dict(kinesis_event_record)
data = json.loads(record.message)
process_event(data)
_source: EventSource = 'aws:kinesis'#
__init__(kinesis: Dict[str, Any], eventSource: str, eventVersion: str, eventID: str, eventName: str, invokeIdentityArn: str, awsRegion: str, eventSourceARN: str, **kwargs: Any) None[source]#

Initialize a Kinesis Data Stream event record.

Parameters:
  • kinesis

    Kinesis-specific data containing the record payload and metadata. Structure:

    {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "user-123",
        "sequenceNumber": "49647175778160097793486557372840800878012000746547970050",
        "data": "eyJldmVudCI6ICJsb2dpbiJ9",  # base64-encoded
        "approximateArrivalTimestamp": 1702071427.66
    }
    

  • eventSource – Source identifier (always “aws:kinesis” for Kinesis).

  • eventVersion – Event format version.

  • eventID – Unique event identifier (shard ID:sequence number).

  • eventName – Event name (typically “aws:kinesis:record”).

  • invokeIdentityArn – ARN of the identity invoking the Lambda.

  • awsRegion – AWS region where the stream exists.

  • eventSourceARN – ARN of the Kinesis stream.

  • kwargs – Additional fields (ignored, for forward compatibility).

property message_id: str#

Get the unique event ID for this Kinesis record.

Returns:

Event ID in format: “shardId-{timestamp}-{sequenceNumber}”. Example: “shardId-000000000001:49590338271490256608559692538361571095921575989136588898”

property message: str#

Get the decoded message data from the Kinesis record. Automatically decodes the base64-encoded data field and returns it as a UTF-8 string.

Returns:

Decoded message content. Often JSON-encoded string that needs further parsing.

Example

record = KinesisRecord(...)
message = record.message  # '{"event": "login", "user_id": 123}'

# Parse JSON if needed
import json
data = json.loads(record.message)
print(data['event'])  # "login"
property sequence_number: str#

Get the sequence number for this Kinesis record.

Returns:

Sequence number string. Used for ordering and checkpointing. Example: “49647175778160097793486557372840800878012000746547970050”

property partition_key: str#

Get the partition key for this Kinesis record.

Returns:

Partition key that determined which shard received this record. Example: “user-123”

property approximate_arrival_timestamp: float#

Get the approximate arrival timestamp for this record.

Returns:

Unix timestamp (seconds since epoch) when record arrived in the stream. Example: 1702071427.66

_abc_impl = <_abc._abc_data object>#

SNS Event#

AWS Lambda SNS topic event record.

This module provides a wrapper for SNS topic records received by AWS Lambda functions, including access to message content, attributes, and metadata.

class core_aws.services.lambdas.events.sns_topic.SnsRecord(EventSource: str, EventSubscriptionArn: str, EventVersion: str, Sns: Dict[str, Any], **kwargs: Any)[source]#

Bases: EventRecord

Represents a record from AWS SNS topic in a Lambda event.

Provides access to SNS message content, attributes, subject, timestamp, and other metadata from notifications published to SNS topics.

Example

# Lambda handler receiving SNS event
def lambda_handler(event, context):
    for raw_record in event['Records']:
        record = EventRecord.from_dict(raw_record)

        if isinstance(record, SnsRecord):
            # Access message content
            print(f"Message: {record.message}")
            print(f"Subject: {record.subject}")

            # Access SNS metadata
            print(f"Topic ARN: {record.topic_arn}")
            print(f"Timestamp: {record.timestamp}")

            # Access message attributes
            if record.message_attributes:
                for key, value in record.message_attributes.items():
                    print(f"Attribute {key}: {value}")

# Process JSON from SNS
import json
record = SnsRecord.from_dict(sns_event_record)
data = json.loads(record.message)
process_notification(data)
_source: EventSource = 'aws:sns'#
__init__(EventSource: str, EventSubscriptionArn: str, EventVersion: str, Sns: Dict[str, Any], **kwargs: Any) None[source]#

Initialize an SNS topic event record.

Parameters:
  • EventSource – Source identifier (always “aws:sns” for SNS).

  • EventSubscriptionArn – ARN of the SNS subscription that triggered this event.

  • EventVersion – Event format version.

  • Sns

    SNS-specific data containing the message and metadata. Structure:

    {
        "Type": "Notification",
        "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
        "TopicArn": "arn:aws:sns:us-east-1:123456789012:my-topic",
        "Subject": "Order Notification",
        "Message": '{"order_id": 123, "status": "shipped"}',
        "Timestamp": "2024-01-01T12:00:00.000Z",
        "SignatureVersion": "1",
        "Signature": "...",
        "SigningCertUrl": "https://...",
        "UnsubscribeUrl": "https://...",
        "MessageAttributes": {
            "priority": {
                "Type": "String",
                "Value": "high"
            }
        }
    }
    

  • kwargs – Additional fields (ignored, for forward compatibility).

property message_id: str#

Get the unique message ID for this SNS notification.

Returns:

SNS message ID (UUID format). Example: “95df01b4-ee98-5cb9-9903-4c221d41eb5e”

property message: str#

Get the message content from the SNS notification.

Returns:

Message body. Often JSON-encoded string that needs further parsing.

Example

record = SnsRecord(...)
message = record.message  # '{"order_id": 123, "status": "shipped"}'

# Parse JSON if needed
import json
data = json.loads(record.message)
print(data['order_id'])  # 123
property topic_arn: str#

Get the ARN of the SNS topic that published this message.

Returns:

SNS topic ARN. Example: “arn:aws:sns:us-east-1:123456789012:my-topic”

property subject: str | None#

Get the subject of the SNS notification (if provided).

Returns:

Subject string or None if no subject was provided. Example: “Order Notification”

property timestamp: str#

Get the timestamp when the message was published.

Returns:

ISO 8601 timestamp string. Example: “2024-01-01T12:00:00.000Z”

property message_attributes: Dict[str, Any]#

Get the message attributes (custom metadata) from the SNS notification.

Returns:

Dictionary of message attributes with structure:

{
    "attribute_name": {
        "Type": "String" | "Number" | "Binary",
        "Value": "attribute_value"
    }
}

Returns empty dict if no attributes present.

Example

record = SnsRecord(...)
attrs = record.message_attributes

if "priority" in attrs:
    priority = attrs["priority"]["Value"]
    print(f"Priority: {priority}")

# Extract all string attributes
for name, attr in attrs.items():
    if attr["Type"] == "String":
        print(f"{name}: {attr['Value']}")
_abc_impl = <_abc._abc_data object>#

SQS Event#

AWS Lambda SQS queue event record.

This module provides a wrapper for SQS queue records received by AWS Lambda functions, including access to message body, attributes, and receipt handles.

class core_aws.services.lambdas.events.sqs_queue.SqsRecord(eventSource: str, eventSourceARN: str, awsRegion: str, messageId: str, receiptHandle: str, body: str, md5OfBody: str, attributes: Dict[str, Any], messageAttributes: Dict[str, Any], md5OfMessageAttributes: str | None = None, **kwargs: Any)[source]#

Bases: EventRecord

Represents a record from AWS SQS queue in a Lambda event.

Provides access to SQS message body, attributes, receipt handle, and metadata from messages pulled from SQS queues. The receipt handle can be used to delete the message after processing.

Example

# Lambda handler receiving SQS event
def lambda_handler(event, context):
    for raw_record in event['Records']:
        record = EventRecord.from_dict(raw_record)

        if isinstance(record, SqsRecord):
            # Access message content
            print(f"Message: {record.message}")
            print(f"Message ID: {record.message_id}")

            # Access SQS metadata
            print(f"Queue ARN: {record.queue_arn}")
            print(f"Receipt Handle: {record.receipt_handle}")

            # Access message attributes
            if record.message_attributes:
                for key, value in record.message_attributes.items():
                    print(f"Attribute {key}: {value}")

            # Access system attributes
            sent_timestamp = record.attributes.get("SentTimestamp")
            print(f"Sent at: {sent_timestamp}")

# Process JSON from SQS
import json
record = SqsRecord.from_dict(sqs_event_record)
data = json.loads(record.message)
process_message(data)
_source: EventSource = 'aws:sqs'#
__init__(eventSource: str, eventSourceARN: str, awsRegion: str, messageId: str, receiptHandle: str, body: str, md5OfBody: str, attributes: Dict[str, Any], messageAttributes: Dict[str, Any], md5OfMessageAttributes: str | None = None, **kwargs: Any) None[source]#

Initialize an SQS queue event record.

Parameters:
  • eventSource – Source identifier (always “aws:sqs” for SQS).

  • eventSourceARN – ARN of the SQS queue.

  • awsRegion – AWS region where the queue exists.

  • messageId – Unique message identifier.

  • receiptHandle – Receipt handle for deleting the message.

  • body – Message body content (often JSON-encoded).

  • md5OfBody – MD5 hash of the message body.

  • attributes

    System attributes containing message metadata. Common attributes:

    {
        "ApproximateReceiveCount": "1",
        "SentTimestamp": "1702071427000",
        "SenderId": "AIDAIT2UOQQY3AUEKVGXU",
        "ApproximateFirstReceiveTimestamp": "1702071427000"
    }
    

  • md5OfMessageAttributes – MD5 hash of message attributes.

  • messageAttributes

    Custom message attributes. Structure:

    {
        "attribute_name": {
            "stringValue": "value",
            "dataType": "String"
        }
    }
    

  • kwargs – Additional fields (ignored, for forward compatibility).

property message_id: str#

Get the unique message ID for this SQS message.

Returns:

SQS message ID (UUID format). Example: “19dd0b57-b21e-4ac1-bd88-01bbb068cb78”

property message: str#

Get the message body from the SQS message.

Returns:

Message body. Often JSON-encoded string that needs further parsing.

Example

record = SqsRecord(...)
message = record.message  # '{"order_id": 123, "status": "pending"}'

# Parse JSON if needed
import json
data = json.loads(record.message)
print(data['order_id'])  # 123
property receipt_handle: str#

Get the receipt handle for this SQS message.

The receipt handle is required to delete the message from the queue after processing. Each time a message is received, a new receipt handle is provided.

Returns:

Receipt handle string (opaque token). Example: “AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a…”

Example

from boto3 import client

sqs = client('sqs')
record = SqsRecord(...)

# Delete message after processing
sqs.delete_message(
    QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
    ReceiptHandle=record.receipt_handle
)
property queue_arn: str#

Get the ARN of the SQS queue that this message came from.

Returns:

SQS queue ARN. Example: “arn:aws:sqs:us-east-1:123456789012:my-queue”

property attributes: Dict[str, Any]#

Get the system attributes for this SQS message.

System attributes contain metadata about the message such as timestamps, sender information, and receive count.

Returns:

Dictionary of system attributes:

{
    "ApproximateReceiveCount": "1",
    "SentTimestamp": "1702071427000",
    "SenderId": "AIDAIT2UOQQY3AUEKVGXU",
    "ApproximateFirstReceiveTimestamp": "1702071427000"
}

Example

record = SqsRecord(...)

# Check how many times message was received
receive_count = int(record.attributes.get("ApproximateReceiveCount", "0"))
if receive_count > 3:
    print("Message has been received multiple times")

# Get sent timestamp
sent_ts = record.attributes.get("SentTimestamp")
print(f"Sent at: {sent_ts}")
property message_attributes: Dict[str, Any]#

Get the custom message attributes for this SQS message.

Message attributes are custom metadata set by the message producer.

Returns:

Dictionary of message attributes with structure:

{
    "attribute_name": {
        "stringValue": "value",
        "dataType": "String"
    }
}

Returns empty dict if no attributes present.

Example

record = SqsRecord(...)
attrs = record.message_attributes

# Check for priority attribute
if "priority" in attrs:
    priority = attrs["priority"]["stringValue"]
    print(f"Priority: {priority}")

# Extract all string attributes
for name, attr in attrs.items():
    if attr["dataType"] == "String":
        print(f"{name}: {attr['stringValue']}")
property md5_of_body: str#

Get the MD5 hash of the message body.

Can be used to verify message integrity.

Returns:

MD5 hash string. Example: “098f6bcd4621d373cade4e832627b4f6”

_abc_impl = <_abc._abc_data object>#