Source code for core_aws.cdc.targets.sqs

# -*- coding: utf-8 -*-

"""
Amazon SQS CDC target implementation.

This module provides a CDC (Change Data Capture) target that sends records
to AWS Simple Queue Service (SQS) queues for reliable message queuing and
asynchronous processing.
"""

import json
from typing import List, Optional
from uuid import uuid4

from core_cdc.base import Record
from core_cdc.targets.base import ITarget
from core_mixins.utils import get_batches

from core_aws.services.sqs.client import SqsClient


[docs] class SqsTarget(ITarget): """ CDC target for sending records to AWS SQS queues enabling reliable message queuing for asynchronous data processing workflows. **Example:** .. code-block:: python from core_aws.cdc.targets import SqsTarget target = SqsTarget( aws_region="us-east-1", queue_name="my-queue" ) target.init_client() target.save(records) .. """ client: SqsClient
[docs] def __init__(self, aws_region: str, queue_name: str, **kwargs) -> None: """ Initialize the SQS target. :param str aws_region: AWS region where the SQS queue is located. :param str queue_name: Name of the target SQS queue. :param kwargs: Additional arguments passed to the base ITarget class. """ super().__init__(**kwargs) self.aws_region = aws_region self.queue_url: Optional[str] = None self.execute_ddl = False self.queue_name = queue_name self.client = SqsClient(region=aws_region)
[docs] def init_client(self, **kwargs) -> None: """ Initialize the SQS client and resolve the queue URL. This method must be called before saving records to properly initialize the queue URL from the queue name. :param kwargs: Additional keyword arguments (currently unused). :raises: Boto3 ClientError if the queue doesn't exist. """ self.queue_url = self.client.get_queue_by_name(self.queue_name).url
[docs] def _save(self, records: List[Record], **kwargs): """ Send CDC records to the SQS queue in batches. Records are sent in batches of 10 (SQS batch limit) with unique message IDs generated for each record. :param List[Record] records: List of CDC records to send to the queue. :param kwargs: Additional keyword arguments (currently unused). :raises RuntimeError: If queue_url is not initialized (init_client not called). .. note:: Messages are sent with zero delay and no deduplication settings. For FIFO queues, you may need to customize the implementation. """ if not self.queue_url: raise RuntimeError("SQS queue URL not initialized. Call init_client() first.") for batch in get_batches([x.to_json() for x in records], 10): self.client.send_message_batch( queue_url=self.queue_url, entries=[ { "Id": str(uuid4()), "MessageBody": json.dumps(entry), "DelaySeconds": 0, # "MessageDeduplicationId": "", # "MessageGroupId": "" } for entry in batch ] )