Source code for core_aws.cdc.targets.sqs
# -*- coding: utf-8 -*-
"""
Amazon SQS CDC target implementation.
This module provides a CDC (Change Data Capture) target that sends records
to AWS Simple Queue Service (SQS) queues for reliable message queuing and
asynchronous processing.
"""
import json
from typing import List, Optional
from uuid import uuid4
from core_cdc.base import Record
from core_cdc.targets.base import ITarget
from core_mixins.utils import get_batches
from core_aws.services.sqs.client import SqsClient
[docs]
class SqsTarget(ITarget):
"""
CDC target for sending records to AWS SQS queues enabling
reliable message queuing for asynchronous data
processing workflows.
**Example:**
.. code-block:: python
from core_aws.cdc.targets import SqsTarget
target = SqsTarget(
aws_region="us-east-1",
queue_name="my-queue"
)
target.init_client()
target.save(records)
..
"""
client: SqsClient
[docs]
def __init__(self, aws_region: str, queue_name: str, **kwargs) -> None:
"""
Initialize the SQS target.
:param str aws_region: AWS region where the SQS queue is located.
:param str queue_name: Name of the target SQS queue.
:param kwargs: Additional arguments passed to the base ITarget class.
"""
super().__init__(**kwargs)
self.aws_region = aws_region
self.queue_url: Optional[str] = None
self.execute_ddl = False
self.queue_name = queue_name
self.client = SqsClient(region=aws_region)
[docs]
def init_client(self, **kwargs) -> None:
"""
Initialize the SQS client and resolve the queue URL.
This method must be called before saving records to properly
initialize the queue URL from the queue name.
:param kwargs: Additional keyword arguments (currently unused).
:raises: Boto3 ClientError if the queue doesn't exist.
"""
self.queue_url = self.client.get_queue_by_name(self.queue_name).url
[docs]
def _save(self, records: List[Record], **kwargs):
"""
Send CDC records to the SQS queue in batches.
Records are sent in batches of 10 (SQS batch limit) with unique
message IDs generated for each record.
:param List[Record] records: List of CDC records to send to the queue.
:param kwargs: Additional keyword arguments (currently unused).
:raises RuntimeError: If queue_url is not initialized (init_client not called).
.. note::
Messages are sent with zero delay and no deduplication settings.
For FIFO queues, you may need to customize the implementation.
"""
if not self.queue_url:
raise RuntimeError("SQS queue URL not initialized. Call init_client() first.")
for batch in get_batches([x.to_json() for x in records], 10):
self.client.send_message_batch(
queue_url=self.queue_url,
entries=[
{
"Id": str(uuid4()),
"MessageBody": json.dumps(entry),
"DelaySeconds": 0,
# "MessageDeduplicationId": "",
# "MessageGroupId": ""
} for entry in batch
]
)