Source code for core_aws.services.sqs.client

# -*- coding: utf-8 -*-

"""
AWS Simple Queue Service (SQS) client wrapper.

This module provides a high-level interface for interacting with AWS SQS,
including message sending, receiving, deletion, and batch operations.
"""

from typing import Any, Dict, Iterator, List

import boto3
from botocore.exceptions import ClientError
from core_aws.services.base import AwsClient
from core_aws.services.base import AwsClientException
from core_mixins.utils import get_batches


[docs] class SqsClient(AwsClient): """ Client for AWS Simple Queue Service (SQS). This client provides methods for sending, receiving, and deleting messages from SQS queues. It supports both single and batch operations, with automatic retry logic for failed deletions. Example: .. code-block:: python # Initialize client sqs = SqsClient(region="us-east-1") # Get a queue by name queue = sqs.get_queue_by_name("my-queue") # Send a message sqs.send_message(queue.url, "Hello, SQS!") # Receive messages messages = sqs.receive_messages(queue.url) for msg in messages: print(msg["Body"]) .. """ client: "mypy_boto3_sqs.client.SQSClient" # type: ignore[name-defined] # noqa: F821 resource: Any # boto3.resources.factory.sqs.ServiceResource
[docs] def __init__(self, region: str, **kwargs: Any) -> None: """ Initialize the SQS client. :param region: AWS region name (e.g., 'us-east-1', 'eu-west-1'). :param kwargs: Additional arguments passed to boto3.client(). """ super().__init__("sqs", region_name=region, **kwargs) self.resource = boto3.resource("sqs", region_name=region, **kwargs)
[docs] def create_queue(self, queue_name: str, **kwargs: Any) -> str: """ Create a new SQS queue with the specified name and attributes. Creates a standard or FIFO queue. Queue names are limited to 80 characters including alphanumeric characters, hyphens (-), and underscores (_). FIFO queue names must end with the .fifo suffix. Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/create_queue.html :param queue_name: Name of the queue to create (max 80 characters). :param kwargs: Additional boto3 parameters: - **Attributes** (dict): Queue configuration attributes: - **DelaySeconds** (str): Delivery delay in seconds (0-900). Default: 0. - **MaximumMessageSize** (str): Max message size in bytes (1024-262144). Default: 262144. - **MessageRetentionPeriod** (str): Message retention in seconds (60-1209600). Default: 345600 (4 days). - **ReceiveMessageWaitTimeSeconds** (str): Long-polling wait time (0-20). Default: 0. - **VisibilityTimeout** (str): Visibility timeout in seconds (0-43200). Default: 30. - **FifoQueue** (str): 'true' for FIFO queue, 'false' for standard. Default: 'false'. - **ContentBasedDeduplication** (str): 'true' to enable content-based deduplication (FIFO only). - **KmsMasterKeyId** (str): ID of AWS KMS key for server-side encryption. - **KmsDataKeyReusePeriodSeconds** (str): KMS key reuse period (60-86400). Default: 300. - **DeduplicationScope** (str): 'messageGroup' or 'queue' (high-throughput FIFO only). - **FifoThroughputLimit** (str): 'perQueue' or 'perMessageGroupId' (high-throughput FIFO only). - **RedrivePolicy** (str): JSON string defining dead-letter queue: .. code-block:: python { "deadLetterTargetArn": "arn:aws:sqs:region:account:dlq-name", "maxReceiveCount": "3" } .. - **RedriveAllowPolicy** (str): JSON string defining which source queues can use this as DLQ. - **tags** (dict): Key-value pairs to assign as queue tags. :return: The queue URL string (e.g., "https://sqs.region.amazonaws.com/account/queue-name"). :raises AwsClientException: If queue creation fails. Example: .. code-block:: python sqs = SqsClient(region="us-east-1") # Create a basic standard queue queue_url = sqs.create_queue(queue_name="my-queue") print(f"Queue URL: {queue_url}") # Create a FIFO queue with custom attributes queue_url = sqs.create_queue( queue_name="my-queue.fifo", Attributes={ "FifoQueue": "true", "ContentBasedDeduplication": "true", "MessageRetentionPeriod": "86400", # 1 day "VisibilityTimeout": "60" } ) # Create a queue with dead-letter queue queue_url = sqs.create_queue( queue_name="my-main-queue", Attributes={ "RedrivePolicy": json.dumps({ "deadLetterTargetArn": "arn:aws:sqs:us-east-1:123456789012:my-dlq", "maxReceiveCount": "5" }), "VisibilityTimeout": "30" }, tags={ "Environment": "production", "Application": "my-app" } ) .. """ try: response = self.client.create_queue(QueueName=queue_name, **kwargs) return response["QueueUrl"] except ClientError as error: raise AwsClientException(error) from error
[docs] def get_queue_by_name( self, queue_name: str, **kwargs: Any ) -> "mypy_boto3_sqs.service_resource.Queue": # type: ignore[name-defined] # noqa: F821 """ Retrieve an existing Amazon SQS queue by name. Returns a Queue resource object that can be used to interact with the queue. To access a queue owned by another AWS account, use the `QueueOwnerAWSAccountId` parameter. Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/service-resource/get_queue_by_name.html :param queue_name: The name of the queue. :param kwargs: Additional boto3 parameters: - **QueueOwnerAWSAccountId** (str): AWS account ID of the account that created the queue. Required for accessing queues in other accounts. :return: sqs.Queue resource object with the following attributes: - **url** (str): Queue URL (https://sqs.<region>.amazonaws.com/<account>/QueueName) - **dead_letter_source_queues**: Dead letter queue information - **meta**: Queue metadata - **attributes** (dict): Queue attributes dictionary: .. code-block:: python { 'QueueArn': 'arn:aws:sqs:...', 'ApproximateNumberOfMessages': '0', 'ApproximateNumberOfMessagesNotVisible': '0', 'ApproximateNumberOfMessagesDelayed': '0', 'CreatedTimestamp': '1699539978', 'LastModifiedTimestamp': '1699540164', 'VisibilityTimeout': '300', 'MaximumMessageSize': '262144', 'MessageRetentionPeriod': '3600', 'DelaySeconds': '60', ... } .. :raises AwsClientException: If the queue cannot be retrieved. Example: .. code-block:: python sqs = SqsClient(region="us-east-1") # Get a queue in same account queue = sqs.get_queue_by_name("my-queue") print(f"Queue URL: {queue.url}") # Get a queue in another account queue = sqs.get_queue_by_name( "cross-account-queue", QueueOwnerAWSAccountId="123456789012" ) .. """ try: return self.resource.get_queue_by_name(QueueName=queue_name, **kwargs) except ClientError as error: raise AwsClientException(error) from error
[docs] def send_message( self, queue_url: str, message: str, **kwargs: Any ) -> Dict[str, Any]: """ Send a single message to an SQS queue. Delivers a message to the specified queue. Messages can contain XML, JSON, or unformatted text. Allowed Unicode characters: #x9, #xA, #xD, #x20 to #xD7FF, #xE000 to #xFFFD, #x10000 to #x10FFFF. Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Client.send_message :param queue_url: URL of the SQS queue. :param message: Message body (up to 256 KB). :param kwargs: Additional boto3 parameters: - **DelaySeconds** (int): Delay before message becomes available (0-900 seconds). - **MessageAttributes** (dict): Custom attributes for the message. - **MessageDeduplicationId** (str): Deduplication ID for FIFO queues. - **MessageGroupId** (str): Message group ID for FIFO queues (required). :return: Dictionary containing MessageId, MD5OfMessageBody, and SequenceNumber. :raises AwsClientException: If message sending fails. Example: .. code-block:: python sqs = SqsClient(region="us-east-1") # Send a simple message response = sqs.send_message( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", message="Hello, SQS!" ) print(f"Message ID: {response['MessageId']}") # Send with delay and attributes sqs.send_message( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", message='{"event": "user_signup", "user_id": 123}', DelaySeconds=30, MessageAttributes={ "EventType": { "StringValue": "UserSignup", "DataType": "String" } } ) .. """ try: return self.client.send_message( QueueUrl=queue_url, MessageBody=message, **kwargs) except ClientError as error: raise AwsClientException(error) from error
[docs] def send_message_batch( self, queue_url: str, entries: List[Dict[str, Any]], **kwargs: Any ) -> Dict[str, Any]: """ Send multiple messages to an SQS queue in a single request. Delivers up to 10 messages to the specified queue in a single batch operation. For FIFO queues, messages within a batch are enqueued in the order they are sent. Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/send_message_batch.html :param queue_url: URL of the SQS queue. :param entries: List of message entries (max 10). Each entry must contain: - **Id** (str): Unique identifier for the message (within batch). - **MessageBody** (str): Message content (up to 256 KB). - **DelaySeconds** (int, optional): Message delay (0-900 seconds). - **MessageAttributes** (dict, optional): Custom message attributes. - **MessageDeduplicationId** (str, optional): For FIFO queues. - **MessageGroupId** (str, optional): For FIFO queues. :param kwargs: Additional boto3 parameters. :return: Dictionary containing Successful and Failed lists: .. code-block:: python { "Successful": [ { "Id": "string", "MessageId": "string", "MD5OfMessageBody": "string", "SequenceNumber": "string" } ], "Failed": [ { "Id": "string", "SenderFault": True|False, "Code": "string", "Message": "string" } ] } .. :raises AwsClientException: If batch sending fails. Example: .. code-block:: python sqs = SqsClient(region="us-east-1") # Send multiple messages response = sqs.send_message_batch( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", entries=[ { "Id": "msg1", "MessageBody": "First message" }, { "Id": "msg2", "MessageBody": "Second message", "DelaySeconds": 10 }, { "Id": "msg3", "MessageBody": "Third message" } ] ) print(f"Successful: {len(response['Successful'])}") print(f"Failed: {len(response['Failed'])}") .. """ try: return self.client.send_message_batch( QueueUrl=queue_url, Entries=entries, **kwargs) except ClientError as error: raise AwsClientException(error) from error
[docs] def receive_messages( self, queue_url: str, max_number_of_msg: int = 10, **kwargs: Any ) -> List[Dict[str, Any]]: """ Receive one or more messages from an SQS queue. Retrieves up to 10 messages from the specified queue. Use the `WaitTimeSeconds` parameter to enable long-polling (recommended for reducing empty receives and API costs). Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/receive_message.html :param queue_url: URL of the SQS queue. :param max_number_of_msg: Maximum number of messages to retrieve (1-10). Default: 10. :param kwargs: Additional boto3 parameters: - **AttributeNames** (list): System attributes to retrieve (e.g., ['All', 'ApproximateReceiveCount']). - **MessageAttributeNames** (list): Custom message attributes to retrieve (e.g., ['All']). - **VisibilityTimeout** (int): Duration message is hidden after retrieval (0-43200 seconds). - **WaitTimeSeconds** (int): Long-polling wait time (0-20 seconds). :return: List of message dictionaries: .. code-block:: python [ { "MessageId": "string", "ReceiptHandle": "string", "MD5OfBody": "string", "Body": "string", "Attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1699539978000" }, "MD5OfMessageAttributes": "string", "MessageAttributes": { "AttributeName": { "StringValue": "string", "BinaryValue": b"bytes", "DataType": "String" } } } ] .. :raises AwsClientException: If message retrieval fails. Example: .. code-block:: python sqs = SqsClient(region="us-east-1") # Basic receive messages = sqs.receive_messages( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue" ) for msg in messages: print(f"Message: {msg['Body']}") # Long-polling with attributes messages = sqs.receive_messages( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", max_number_of_msg=5, WaitTimeSeconds=20, # Long-poll for 20 seconds MessageAttributeNames=['All'], AttributeNames=['All'] ) .. """ try: return self.client.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=max_number_of_msg, **kwargs).get("Messages", []) except ClientError as error: raise AwsClientException(error) from error
[docs] def retrieve_all_messages( self, queue_url: str, **kwargs: Any ) -> Iterator[Dict[str, Any]]: """ Retrieve all messages from a queue using an iterator. Continuously polls the queue and yields messages until the queue is empty. Useful for processing all messages in a queue. :param queue_url: URL of the SQS queue. :param kwargs: Additional parameters passed to `receive_messages()`. :return: Iterator yielding message dictionaries. Example: .. code-block:: python sqs = SqsClient(region="us-east-1") # Process all messages in queue for message in sqs.retrieve_all_messages( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", WaitTimeSeconds=5 ): print(f"Processing: {message['Body']}") # Process the message... sqs.delete_message(queue_url, message['ReceiptHandle']) .. Warning: This method will continue until the queue is empty. Make sure to delete messages after processing to avoid infinite loops. """ while True: messages = self.receive_messages(queue_url=queue_url, **kwargs) if not messages: break yield from messages
[docs] def delete_message( self, queue_url: str, receipt_handle: str, **kwargs: Any ) -> Dict[str, Any]: """ Delete a single message from an SQS queue. Deletes the specified message using its ReceiptHandle (not MessageId). Messages are automatically deleted after the retention period expires. Amazon SQS can delete a message even if it's locked by visibility timeout. Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/delete_message.html :param queue_url: URL of the SQS queue. :param receipt_handle: Receipt handle of the message (from receive_message). :param kwargs: Additional boto3 parameters. :return: Empty dictionary on success. :raises AwsClientException: If message deletion fails. Example: .. code-block:: python sqs = SqsClient(region="us-east-1") # Receive and delete a message messages = sqs.receive_messages( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue" ) for msg in messages: # Process the message... print(msg['Body']) # Delete after processing sqs.delete_message( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", receipt_handle=msg['ReceiptHandle'] ) .. """ try: return self.client.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle, **kwargs) except ClientError as error: raise AwsClientException(error) from error
[docs] def delete_message_batch( self, queue_url: str, entries: List[Dict[str, str]], **kwargs: Any ) -> Dict[str, Any]: """ Delete multiple messages from an SQS queue in a single request. Deletes up to 10 messages in a single batch operation. Each entry must include the message Id and ReceiptHandle. Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/delete_message_batch.html :param queue_url: URL of the SQS queue. :param entries: List of message entries to delete (max 10). Each entry must contain: - **Id** (str): Unique identifier for this deletion (within batch). - **ReceiptHandle** (str): Receipt handle from receive_message. :param kwargs: Additional boto3 parameters. :return: Dictionary containing Successful and Failed lists: .. code-block:: python { "Successful": [ { "Id": "string" } ], "Failed": [ { "Id": "string", "SenderFault": True|False, "Code": "string", "Message": "string" } ] } .. :raises AwsClientException: If batch deletion fails. Example: .. code-block:: python sqs = SqsClient(region="us-east-1") # Receive messages messages = sqs.receive_messages( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue" ) # Delete in batch delete_entries = [ { "Id": msg['MessageId'], "ReceiptHandle": msg['ReceiptHandle'] } for msg in messages ] result = sqs.delete_message_batch( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", entries=delete_entries ) print(f"Deleted: {len(result['Successful'])}") print(f"Failed: {len(result['Failed'])}") .. """ try: return self.client.delete_message_batch( QueueUrl=queue_url, Entries=entries, **kwargs) except ClientError as error: raise AwsClientException(error) from error
[docs] def delete_messages( self, queue_url: str, entries: List[Dict[str, str]], retries: int = 3, **kwargs: Any ) -> Dict[str, Any]: """ It's a wrapper over "delete_message_batch" and will delete all messages, if a message deletion fails it will be re-tried until success or until the maximum attempts are exhausted... :param queue_url: The SQS queue url. :param entries: The messages reference to delete. :param retries: Number of re-tries in case of errors while deleting the messages. :param kwargs: Other arguments to pass to delete_message_batch method. :return: .. code-block:: python { "Successful": [{ "Id": "string" }], "Failed": [{ "Id": "string", "SenderFault": True|False, "Code": "string", "Message": "string" }] } .. """ successful = [] def _delete_batch(messages: List[Dict]) -> List[Dict]: failures_: List[Dict] = [] for batch_ in get_batches(messages, 10): output = self.delete_message_batch( queue_url=queue_url, entries=batch_, **kwargs) successful.extend(output.get("Successful", [])) failures_.extend(output.get("Failed", [])) return failures_ failed = [rec["Id"] for rec in _delete_batch(entries)] entries = [record for record in entries if record["Id"] in failed] failures = [] while entries and retries: failures = _delete_batch(entries) failures_ids = [rec["Id"] for rec in failures] entries = [record for record in entries if record["Id"] in failures_ids] retries -= 1 return { "Successful": successful, "Failed": failures }