CDC#

Kinesis Target#

Kinesis Data Stream CDC target implementation.

This module provides a CDC (Change Data Capture) target that sends records to AWS Kinesis Data Streams for real-time data streaming and processing.

class core_aws.cdc.targets.kinesis.KinesisDataStreamTarget(aws_region: str, stream_name: str, partition_key: str, **kwargs)[source]#

Bases: ITarget

CDC target for sending records to AWS Kinesis Data Streams enabling real-time data processing and analytics pipelines.

Example:

from core_aws.cdc.targets import KinesisDataStreamTarget

target = KinesisDataStreamTarget(
    aws_region="us-east-1",
    stream_name="my-data-stream",
    partition_key="user_id"
)
target.save(records)
__init__(aws_region: str, stream_name: str, partition_key: str, **kwargs) None[source]#

Initialize the Kinesis Data Stream target.

Parameters:
  • aws_region – AWS region where the Kinesis stream is located.

  • stream_name – Name of the target Kinesis Data Stream.

  • partition_key – Partition key used to distribute records across shards.

  • kwargs – Additional keyword arguments passed to the base ITarget class.

client: KinesisClient#
_abc_impl = <_abc._abc_data object>#
_save(records: List[Record], **kwargs)[source]#

Save CDC records to the Kinesis Data Stream.

Parameters:
  • records – List of CDC records to send to the stream.

  • kwargs – Additional keyword arguments passed to the Kinesis client.

Returns:

Response from the Kinesis clients send_records operation.

SNS Target#

Amazon SNS CDC target implementation.

This module provides a CDC (Change Data Capture) target that publishes records to AWS Simple Notification Service (SNS) topics for fan-out messaging patterns and event-driven architectures.

class core_aws.cdc.targets.sns.SnsTarget(aws_region: str, topic_arn: str, batch_size: int = 10, **kwargs)[source]#

Bases: ITarget

CDC target for publishing records to AWS SNS topics enabling fan-out messaging patterns where multiple subscribers can receive the same data changes.

Example:

from core_aws.cdc.targets import SnsTarget

target = SnsTarget(
    aws_region="us-east-1",
    topic_arn="arn:aws:sns:us-east-1:123456789012:my-topic",
    batch_size=10
)
target.save(records)
__init__(aws_region: str, topic_arn: str, batch_size: int = 10, **kwargs) None[source]#

Initialize the SNS target.

Parameters:
  • aws_region (str) – AWS region where the SNS topic is located.

  • topic_arn (str) – ARN of the target SNS topic.

  • batch_size (int) – Maximum number of messages per batch.

  • kwargs – Additional arguments passed to the base ITarget class.

client: SnsClient#
_abc_impl = <_abc._abc_data object>#
_save(records: List[Record], **kwargs)[source]#

Publish CDC records to the SNS topic in batches.

Parameters:
  • records (List[Record]) – List of records to publish to the topic.

  • kwargs – Additional keyword arguments (currently unused).

Note

Messages are published with an ID format of “{table_name}-{index}” to track individual records within batches.

SQS Target#

Amazon SQS CDC target implementation.

This module provides a CDC (Change Data Capture) target that sends records to AWS Simple Queue Service (SQS) queues for reliable message queuing and asynchronous processing.

class core_aws.cdc.targets.sqs.SqsTarget(aws_region: str, queue_name: str, **kwargs)[source]#

Bases: ITarget

CDC target for sending records to AWS SQS queues enabling reliable message queuing for asynchronous data processing workflows.

Example:

from core_aws.cdc.targets import SqsTarget

target = SqsTarget(
    aws_region="us-east-1",
    queue_name="my-queue"
)
target.init_client()
target.save(records)
__init__(aws_region: str, queue_name: str, **kwargs) None[source]#

Initialize the SQS target.

Parameters:
  • aws_region (str) – AWS region where the SQS queue is located.

  • queue_name (str) – Name of the target SQS queue.

  • kwargs – Additional arguments passed to the base ITarget class.

client: SqsClient#
init_client(**kwargs) None[source]#

Initialize the SQS client and resolve the queue URL.

This method must be called before saving records to properly initialize the queue URL from the queue name.

Parameters:

kwargs – Additional keyword arguments (currently unused).

Raises:

Boto3 ClientError if the queue doesn’t exist.

_abc_impl = <_abc._abc_data object>#
_save(records: List[Record], **kwargs)[source]#

Send CDC records to the SQS queue in batches.

Records are sent in batches of 10 (SQS batch limit) with unique message IDs generated for each record.

Parameters:
  • records (List[Record]) – List of CDC records to send to the queue.

  • kwargs – Additional keyword arguments (currently unused).

Raises:

RuntimeError – If queue_url is not initialized (init_client not called).

Note

Messages are sent with zero delay and no deduplication settings. For FIFO queues, you may need to customize the implementation.