Source code for core_aws.services.lambdas.events.kinesis_stream

# -*- coding: utf-8 -*-

"""
AWS Lambda Kinesis Data Stream event record.

This module provides a wrapper for Kinesis Data Stream records received by
AWS Lambda functions, handling automatic base64 decoding of the data payload.
"""

import base64
from typing import Any
from typing import Dict

from .base import EventRecord
from .base import EventSource


[docs] class KinesisRecord(EventRecord): """ Represents a record from AWS Kinesis Data Stream in a Lambda event. Automatically decodes base64-encoded Kinesis data and provides access to record metadata including sequence number, partition key, and event details. Example: .. code-block:: python # Lambda handler receiving Kinesis stream event def lambda_handler(event, context): for raw_record in event['Records']: record = EventRecord.from_dict(raw_record) if isinstance(record, KinesisRecord): # Access decoded message print(f"Message: {record.message}") # Access Kinesis metadata print(f"Sequence: {record.sequence_number}") print(f"Partition Key: {record.partition_key}") print(f"Shard ID: {record._kinesis['kinesisSchemaVersion']}") # Process JSON from Kinesis import json record = KinesisRecord.from_dict(kinesis_event_record) data = json.loads(record.message) process_event(data) .. """ _source = EventSource.KINESIS_DATA_STREAM # noinspection PyPep8Naming
[docs] def __init__( self, kinesis: Dict[str, Any], eventSource: str, eventVersion: str, eventID: str, eventName: str, invokeIdentityArn: str, awsRegion: str, eventSourceARN: str, **kwargs: Any ) -> None: """ Initialize a Kinesis Data Stream event record. :param kinesis: Kinesis-specific data containing the record payload and metadata. Structure: .. code-block:: python { "kinesisSchemaVersion": "1.0", "partitionKey": "user-123", "sequenceNumber": "49647175778160097793486557372840800878012000746547970050", "data": "eyJldmVudCI6ICJsb2dpbiJ9", # base64-encoded "approximateArrivalTimestamp": 1702071427.66 } .. :param eventSource: Source identifier (always "aws:kinesis" for Kinesis). :param eventVersion: Event format version. :param eventID: Unique event identifier (shard ID:sequence number). :param eventName: Event name (typically "aws:kinesis:record"). :param invokeIdentityArn: ARN of the identity invoking the Lambda. :param awsRegion: AWS region where the stream exists. :param eventSourceARN: ARN of the Kinesis stream. :param kwargs: Additional fields (ignored, for forward compatibility). """ self._aws_region = awsRegion self._invoke_identity_arn = invokeIdentityArn self._kinesis = kinesis self._event_name = eventName self._event_source = eventSource self._event_source_arn = eventSourceARN self._event_version = eventVersion self._event_id = eventID
@property def message_id(self) -> str: """ Get the unique event ID for this Kinesis record. :return: Event ID in format: "shardId-{timestamp}-{sequenceNumber}". Example: "shardId-000000000001:49590338271490256608559692538361571095921575989136588898" """ return self._event_id @property def message(self) -> str: """ Get the decoded message data from the Kinesis record. Automatically decodes the base64-encoded data field and returns it as a UTF-8 string. :return: Decoded message content. Often JSON-encoded string that needs further parsing. Example: .. code-block:: python record = KinesisRecord(...) message = record.message # '{"event": "login", "user_id": 123}' # Parse JSON if needed import json data = json.loads(record.message) print(data['event']) # "login" .. """ return base64.b64decode(self._kinesis["data"]).decode() @property def sequence_number(self) -> str: """ Get the sequence number for this Kinesis record. :return: Sequence number string. Used for ordering and checkpointing. Example: "49647175778160097793486557372840800878012000746547970050" """ return self._kinesis["sequenceNumber"] @property def partition_key(self) -> str: """ Get the partition key for this Kinesis record. :return: Partition key that determined which shard received this record. Example: "user-123" """ return self._kinesis["partitionKey"] @property def approximate_arrival_timestamp(self) -> float: """ Get the approximate arrival timestamp for this record. :return: Unix timestamp (seconds since epoch) when record arrived in the stream. Example: 1702071427.66 """ return self._kinesis["approximateArrivalTimestamp"]