Source code for core_aws.cdc.targets.sns

# -*- coding: utf-8 -*-

"""
Amazon SNS CDC target implementation.

This module provides a CDC (Change Data Capture) target that publishes records
to AWS Simple Notification Service (SNS) topics for fan-out messaging patterns
and event-driven architectures.
"""

from typing import List

from core_cdc.base import Record
from core_cdc.targets.base import ITarget

from core_aws.services import AwsClientException
from core_aws.services.sns.client import SnsClient
from core_aws.services.sns.client import SnsMessage


[docs] class SnsTarget(ITarget): """ CDC target for publishing records to AWS SNS topics enabling fan-out messaging patterns where multiple subscribers can receive the same data changes. **Example:** .. code-block:: python from core_aws.cdc.targets import SnsTarget target = SnsTarget( aws_region="us-east-1", topic_arn="arn:aws:sns:us-east-1:123456789012:my-topic", batch_size=10 ) target.save(records) .. """ client: SnsClient
[docs] def __init__( self, aws_region: str, topic_arn: str, batch_size: int = 10, **kwargs ) -> None: """ Initialize the SNS target. :param str aws_region: AWS region where the SNS topic is located. :param str topic_arn: ARN of the target SNS topic. :param int batch_size: Maximum number of messages per batch. :param kwargs: Additional arguments passed to the base ITarget class. """ super().__init__(**kwargs) self.aws_region = aws_region self.client = SnsClient(region=aws_region, batch_size=batch_size) self.topic_arn = topic_arn self.execute_ddl = False
[docs] def _save(self, records: List[Record], **kwargs): """ Publish CDC records to the SNS topic in batches. :param List[Record] records: List of records to publish to the topic. :param kwargs: Additional keyword arguments (currently unused). .. note:: Messages are published with an ID format of "{table_name}-{index}" to track individual records within batches. """ result = self.client.publish_batch( topic_arn=self.topic_arn, messages=[ SnsMessage( Id=f"{rec.table_name}-{x}", Message=rec.to_json(), ) for x, rec in enumerate(records) ] ) if result.get("Failed"): raise AwsClientException( f"Failed to publish {len(result['Failed'])} record(s) to SNS: {result['Failed']}" )