Source code for core_aws.cdc.targets.sns
# -*- coding: utf-8 -*-
"""
Amazon SNS CDC target implementation.
This module provides a CDC (Change Data Capture) target that publishes records
to AWS Simple Notification Service (SNS) topics for fan-out messaging patterns
and event-driven architectures.
"""
from typing import List
from core_cdc.base import Record
from core_cdc.targets.base import ITarget
from core_aws.services import AwsClientException
from core_aws.services.sns.client import SnsClient
from core_aws.services.sns.client import SnsMessage
[docs]
class SnsTarget(ITarget):
"""
CDC target for publishing records to AWS SNS topics enabling
fan-out messaging patterns where multiple subscribers can
receive the same data changes.
**Example:**
.. code-block:: python
from core_aws.cdc.targets import SnsTarget
target = SnsTarget(
aws_region="us-east-1",
topic_arn="arn:aws:sns:us-east-1:123456789012:my-topic",
batch_size=10
)
target.save(records)
..
"""
client: SnsClient
[docs]
def __init__(
self,
aws_region: str,
topic_arn: str,
batch_size: int = 10,
**kwargs
) -> None:
"""
Initialize the SNS target.
:param str aws_region: AWS region where the SNS topic is located.
:param str topic_arn: ARN of the target SNS topic.
:param int batch_size: Maximum number of messages per batch.
:param kwargs: Additional arguments passed to the base ITarget class.
"""
super().__init__(**kwargs)
self.aws_region = aws_region
self.client = SnsClient(region=aws_region, batch_size=batch_size)
self.topic_arn = topic_arn
self.execute_ddl = False
[docs]
def _save(self, records: List[Record], **kwargs):
"""
Publish CDC records to the SNS topic in batches.
:param List[Record] records: List of records to publish to the topic.
:param kwargs: Additional keyword arguments (currently unused).
.. note::
Messages are published with an ID format of "{table_name}-{index}"
to track individual records within batches.
"""
result = self.client.publish_batch(
topic_arn=self.topic_arn,
messages=[
SnsMessage(
Id=f"{rec.table_name}-{x}",
Message=rec.to_json(),
)
for x, rec in enumerate(records)
]
)
if result.get("Failed"):
raise AwsClientException(
f"Failed to publish {len(result['Failed'])} record(s) to SNS: {result['Failed']}"
)