Source code for core_aws.services.lambdas.events.base
# -*- coding: utf-8 -*-
"""
AWS Lambda event record base classes and utilities.
This module provides base classes for handling AWS Lambda event records from
various sources (Kinesis, SQS, SNS). It implements a factory pattern that
automatically instantiates the correct record type based on the event source.
"""
from __future__ import annotations
from abc import ABC
from abc import abstractmethod
from enum import Enum
from typing import Any
from typing import Callable
from typing import Dict
[docs]
class EventSource(str, Enum):
"""
AWS event source identifiers for Lambda function triggers.
These values correspond to the "eventSource" or "EventSource" field
in Lambda event records.
"""
KINESIS_DATA_STREAM = "aws:kinesis"
SNS_TOPIC = "aws:sns"
SQS_QUEUE = "aws:sqs"
[docs]
class EventRecord(ABC):
"""
Abstract base class for AWS Lambda event records.
This class provides a factory pattern for creating specific record types
(SqsRecord, SnsRecord, KinesisRecord) based on the event source. Subclasses
are automatically registered when defined with a `_source` attribute.
The factory pattern allows automatic deserialization of Lambda event records:
Example:
.. code-block:: python
# Lambda handler receiving SQS event
def lambda_handler(event, context):
for raw_record in event['Records']:
# Factory automatically creates SqsRecord instance
record = EventRecord.from_dict(raw_record)
# Access common interface
print(f"Message ID: {record.message_id}")
print(f"Body: {record.message}")
# Type-specific access
if isinstance(record, SqsRecord):
print(f"Receipt Handle: {record._receipt_handle}")
# Or handle unknown event sources gracefully
record = EventRecord.from_dict(raw_record)
if isinstance(record, dict):
# Unknown source - got raw dict back
print(f"Unknown event source: {record.get('eventSource')}")
else:
# Known source - got EventRecord subclass
process_message(record.message)
..
Subclass Implementation:
To create a new event record type:
.. code-block:: python
class CustomRecord(EventRecord):
_source = EventSource.SQS_QUEUE # Register with factory
def __init__(self, messageId: str, body: str, **kwargs):
self._message_id = messageId
self._body = body
@property
def message_id(self) -> str:
return self._message_id
@property
def message(self) -> str:
return self._body
..
"""
_subclasses: Dict[str, Callable[..., EventRecord]] = {}
_source: EventSource
def __init_subclass__(cls, **kwargs: Any) -> None:
"""
Register subclass in the factory registry.
Automatically called when a subclass is defined. Registers the subclass
with its associated event source for use in the factory pattern.
:param kwargs: Additional keyword arguments passed to parent __init_subclass__.
"""
super().__init_subclass__(**kwargs)
cls._subclasses[cls._source] = cls
@property
@abstractmethod
def message_id(self) -> str:
"""
Get the unique identifier for this message/record.
:return: Message ID string (format varies by event source).
Example:
- SQS: "19dd0b57-b21e-4ac1-bd88-01bbb068cb78"
- SNS: "95df01b4-ee98-5cb9-9903-4c221d41eb5e"
- Kinesis: Sequence number
"""
@property
@abstractmethod
def message(self) -> str:
"""
Get the message payload/body.
:return: Message content as string (may be JSON-encoded).
Example:
- SQS: Body field content
- SNS: Message field content
- Kinesis: Base64-decoded data
"""
[docs]
@classmethod
def from_dict(
cls,
message: Dict[str, Any],
) -> EventRecord | Dict[str, Any]:
"""
Factory method to create appropriate EventRecord subclass
from raw event data. Inspects the "eventSource" or "EventSource" field
to determine the correct record type, then instantiates and returns
it. If the event source is not recognized, returns the raw
dictionary unchanged.
:param message:
Raw event record dictionary from Lambda event. Must contain
"eventSource" or "EventSource" field.
:return:
- EventRecord subclass instance if source is recognized
- Raw dict if event source is unknown/unsupported
Example:
.. code-block:: python
# SQS event record
sqs_record = {
"eventSource": "aws:sqs",
"messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
"body": '{"order_id": 123}',
"receiptHandle": "AQEBwJ...",
# ... other fields
}
record = EventRecord.from_dict(sqs_record)
# Returns: SqsRecord instance
print(record.message_id) # "19dd0b57-b21e-4ac1-bd88-01bbb068cb78"
print(record.message) # '{"order_id": 123}'
# Unknown event source
unknown_record = {
"eventSource": "aws:unknown",
"data": "something"
}
record = EventRecord.from_dict(unknown_record)
# Returns: Dict (unchanged)
print(record) # {"eventSource": "aws:unknown", "data": "something"}
# Batch processing in Lambda
def lambda_handler(event, context):
for raw_record in event['Records']:
record = EventRecord.from_dict(raw_record)
if isinstance(record, dict):
# Unrecognized source
logger.warning(f"Unknown source: {record.get('eventSource')}")
continue
# Process recognized record types
process_message(record.message_id, record.message)
..
"""
# Try to get event source from either field name variation
event_source = message.get("eventSource", "") or message.get("EventSource", "")
cls_ = cls._subclasses.get(event_source)
if not cls_:
return message
return cls_(**message)