Source code for core_aws.cdc.targets.kinesis
# -*- coding: utf-8 -*-
"""
Kinesis Data Stream CDC target implementation.
This module provides a CDC (Change Data Capture) target that sends records
to AWS Kinesis Data Streams for real-time data streaming and processing.
"""
from typing import List
from core_cdc.base import Record
from core_cdc.targets.base import ITarget
from core_aws.services.kinesis.client import KinesisClient
[docs]
class KinesisDataStreamTarget(ITarget):
"""
CDC target for sending records to AWS Kinesis Data Streams enabling
real-time data processing and analytics pipelines.
**Example:**
.. code-block:: python
from core_aws.cdc.targets import KinesisDataStreamTarget
target = KinesisDataStreamTarget(
aws_region="us-east-1",
stream_name="my-data-stream",
partition_key="user_id"
)
target.save(records)
..
"""
client: KinesisClient
[docs]
def __init__(
self,
aws_region: str,
stream_name: str,
partition_key: str,
**kwargs
) -> None:
"""
Initialize the Kinesis Data Stream target.
:param aws_region: AWS region where the Kinesis stream is located.
:param stream_name: Name of the target Kinesis Data Stream.
:param partition_key: Partition key used to distribute records across shards.
:param kwargs: Additional keyword arguments passed to the base ITarget class.
"""
super().__init__(**kwargs)
self.aws_region = aws_region
self.client = KinesisClient(region=self.aws_region)
self.partition_key = partition_key
self.stream_name = stream_name
[docs]
def _save(self, records: List[Record], **kwargs):
"""
Save CDC records to the Kinesis Data Stream.
:param records: List of CDC records to send to the stream.
:param kwargs: Additional keyword arguments passed to the Kinesis client.
:return: Response from the Kinesis clients send_records operation.
"""
return self.client.send_records(
records=[x.to_json() for x in records],
stream_name=self.stream_name,
partition_key=self.partition_key,
**kwargs
)