Source code for core_aws.cdc.targets.kinesis

# -*- coding: utf-8 -*-

"""
Kinesis Data Stream CDC target implementation.

This module provides a CDC (Change Data Capture) target that sends records
to AWS Kinesis Data Streams for real-time data streaming and processing.
"""

from typing import List

from core_cdc.base import Record
from core_cdc.targets.base import ITarget

from core_aws.services.kinesis.client import KinesisClient


[docs] class KinesisDataStreamTarget(ITarget): """ CDC target for sending records to AWS Kinesis Data Streams enabling real-time data processing and analytics pipelines. **Example:** .. code-block:: python from core_aws.cdc.targets import KinesisDataStreamTarget target = KinesisDataStreamTarget( aws_region="us-east-1", stream_name="my-data-stream", partition_key="user_id" ) target.save(records) .. """ client: KinesisClient
[docs] def __init__( self, aws_region: str, stream_name: str, partition_key: str, **kwargs ) -> None: """ Initialize the Kinesis Data Stream target. :param aws_region: AWS region where the Kinesis stream is located. :param stream_name: Name of the target Kinesis Data Stream. :param partition_key: Partition key used to distribute records across shards. :param kwargs: Additional keyword arguments passed to the base ITarget class. """ super().__init__(**kwargs) self.aws_region = aws_region self.client = KinesisClient(region=self.aws_region) self.partition_key = partition_key self.stream_name = stream_name
[docs] def _save(self, records: List[Record], **kwargs): """ Save CDC records to the Kinesis Data Stream. :param records: List of CDC records to send to the stream. :param kwargs: Additional keyword arguments passed to the Kinesis client. :return: Response from the Kinesis clients send_records operation. """ return self.client.send_records( records=[x.to_json() for x in records], stream_name=self.stream_name, partition_key=self.partition_key, **kwargs )