CDC#
Kinesis Target#
Kinesis Data Stream CDC target implementation.
This module provides a CDC (Change Data Capture) target that sends records to AWS Kinesis Data Streams for real-time data streaming and processing.
- class core_aws.cdc.targets.kinesis.KinesisDataStreamTarget(aws_region: str, stream_name: str, partition_key: str, **kwargs)[source]#
Bases:
ITargetCDC target for sending records to AWS Kinesis Data Streams enabling real-time data processing and analytics pipelines.
Example:
from core_aws.cdc.targets import KinesisDataStreamTarget target = KinesisDataStreamTarget( aws_region="us-east-1", stream_name="my-data-stream", partition_key="user_id" ) target.save(records)
- __init__(aws_region: str, stream_name: str, partition_key: str, **kwargs) None[source]#
Initialize the Kinesis Data Stream target.
- Parameters:
aws_region – AWS region where the Kinesis stream is located.
stream_name – Name of the target Kinesis Data Stream.
partition_key – Partition key used to distribute records across shards.
kwargs – Additional keyword arguments passed to the base ITarget class.
- client: KinesisClient#
- _abc_impl = <_abc._abc_data object>#
SNS Target#
Amazon SNS CDC target implementation.
This module provides a CDC (Change Data Capture) target that publishes records to AWS Simple Notification Service (SNS) topics for fan-out messaging patterns and event-driven architectures.
- class core_aws.cdc.targets.sns.SnsTarget(aws_region: str, topic_arn: str, batch_size: int = 10, **kwargs)[source]#
Bases:
ITargetCDC target for publishing records to AWS SNS topics enabling fan-out messaging patterns where multiple subscribers can receive the same data changes.
Example:
from core_aws.cdc.targets import SnsTarget target = SnsTarget( aws_region="us-east-1", topic_arn="arn:aws:sns:us-east-1:123456789012:my-topic", batch_size=10 ) target.save(records)
- __init__(aws_region: str, topic_arn: str, batch_size: int = 10, **kwargs) None[source]#
Initialize the SNS target.
- _abc_impl = <_abc._abc_data object>#
- _save(records: List[Record], **kwargs)[source]#
Publish CDC records to the SNS topic in batches.
- Parameters:
records (List[Record]) – List of records to publish to the topic.
kwargs – Additional keyword arguments (currently unused).
Note
Messages are published with an ID format of “{table_name}-{index}” to track individual records within batches.
SQS Target#
Amazon SQS CDC target implementation.
This module provides a CDC (Change Data Capture) target that sends records to AWS Simple Queue Service (SQS) queues for reliable message queuing and asynchronous processing.
- class core_aws.cdc.targets.sqs.SqsTarget(aws_region: str, queue_name: str, **kwargs)[source]#
Bases:
ITargetCDC target for sending records to AWS SQS queues enabling reliable message queuing for asynchronous data processing workflows.
Example:
from core_aws.cdc.targets import SqsTarget target = SqsTarget( aws_region="us-east-1", queue_name="my-queue" ) target.init_client() target.save(records)
- init_client(**kwargs) None[source]#
Initialize the SQS client and resolve the queue URL.
This method must be called before saving records to properly initialize the queue URL from the queue name.
- Parameters:
kwargs – Additional keyword arguments (currently unused).
- Raises:
Boto3 ClientError if the queue doesn’t exist.
- _abc_impl = <_abc._abc_data object>#
- _save(records: List[Record], **kwargs)[source]#
Send CDC records to the SQS queue in batches.
Records are sent in batches of 10 (SQS batch limit) with unique message IDs generated for each record.
- Parameters:
records (List[Record]) – List of CDC records to send to the queue.
kwargs – Additional keyword arguments (currently unused).
- Raises:
RuntimeError – If queue_url is not initialized (init_client not called).
Note
Messages are sent with zero delay and no deduplication settings. For FIFO queues, you may need to customize the implementation.