Source code for core_aws.services.sns.client

# -*- coding: utf-8 -*-

"""
AWS Simple Notification Service (SNS) client wrapper.

This module provides a high-level interface for interacting with AWS SNS,
including message publishing to topics, batch operations, and SMS delivery.
"""

from __future__ import annotations

import json
from collections import defaultdict
from typing import Any, Dict, List, Optional

from botocore.exceptions import ClientError
from core_mixins.utils import get_batches

from core_aws.services.base import AwsClient
from core_aws.services.base import AwsClientException
from core_aws.services.sqs.client import SqsClient


[docs] class SnsMessage: """ Represents an SNS message to be published to a topic, phone number, or endpoint. This class encapsulates all parameters needed to publish a message via SNS, supporting both simple string messages and structured JSON messages for multiprotocol delivery. Example: .. code-block:: python # Simple text message msg = SnsMessage(Message="Hello, SNS!") # Structured message with subject msg = SnsMessage( Message="Important notification", Subject="Alert", Id="msg-001" ) # Dict message (auto-converted to JSON) msg = SnsMessage( Message={"user_id": 123, "event": "signup"}, Id="msg-002" ) # FIFO topic message msg = SnsMessage( Message="Order processed", MessageGroupId="orders", MessageDeduplicationId="order-12345" ) .. """ # noinspection PyPep8Naming
[docs] def __init__( self, Message: Dict | str, Id: Optional[str] = None, Subject: Optional[str] = None, MessageStructure: Optional[str] = None, MessageAttributes: Optional[Dict] = None, MessageDeduplicationId: Optional[str] = None, MessageGroupId: Optional[str] = None, ) -> None: """ Initialize an SNS message. :param Message: The message content to send. Can be either a string or dict. - **String**: Sent as-is to all transport protocols. - **Dict**: Automatically converted to JSON structure with "default" key. Constraints: - Except for SMS: UTF-8 strings, max 256 KB (262,144 bytes). - SMS: Max 140-160 characters depending on encoding. Messages longer than limit are split. Total SMS limit: 1,600 characters. :param Id: Unique identifier for the message within a batch. Required for `publish_batch()` operation, ignored for single `publish()` calls. :param Subject: Optional subject line for email endpoints. Also included in standard JSON messages delivered to other endpoints. :param MessageStructure: Set to "json" to send different messages per protocol. When set to "json", the Message parameter must: - Be valid JSON - Contain at least a "default" key with a string value - Optionally contain protocol-specific keys ("http", "sms", "email", etc.) Example JSON structure: {"default": "Default message", "sms": "Short SMS", "email": "Detailed email"} :param MessageAttributes: Custom message attributes for filtering and routing. Dict of attribute names to attribute values with DataType and StringValue/BinaryValue. :param MessageDeduplicationId: FIFO topics only. Deduplication token (up to 128 alphanumeric + punctuation). Messages with same ID within 5 minutes are treated as duplicates. If topic has ContentBasedDeduplication, this overrides auto-generated ID. :param MessageGroupId: FIFO topics only (required). Message group tag (up to 128 alphanumeric + punctuation). Messages in same group are processed in FIFO order. Messages in different groups may process out of order. """ self.Id = Id self.Message = Message self.Subject = Subject self.MessageStructure = MessageStructure self.MessageAttributes = MessageAttributes self.MessageDeduplicationId = MessageDeduplicationId self.MessageGroupId = MessageGroupId
[docs] def as_dict(self) -> Dict[str, Any]: """ Convert the message to a dictionary for SNS API calls. Automatically handles dict messages by converting them to JSON structure with "default" key. Removes None/empty values. :return: Dictionary ready for SNS publish/publish_batch API. Example: .. code-block:: python msg = SnsMessage(Message="Hello", Subject="Test", Id="msg-1") payload = msg.as_dict() # {"Id": "msg-1", "Message": "Hello", "Subject": "Test"} msg2 = SnsMessage(Message={"data": 123}) payload2 = msg2.as_dict() # { # "Message": '{"default": "{\\"data\\": 123}"}', # "MessageStructure": "json" # } .. """ res = { "Id": self.Id, "Message": self.Message, "Subject": self.Subject, "MessageStructure": self.MessageStructure, "MessageAttributes": self.MessageAttributes, "MessageDeduplicationId": self.MessageDeduplicationId, "MessageGroupId": self.MessageGroupId } # Converting dict messages to JSON structure... if isinstance(self.Message, dict): res["MessageStructure"] = "json" res["Message"] = json.dumps({"default": json.dumps(self.Message)}) # Removing None/empty values... for key, value in list(res.items()): if not value: del res[key] return res
[docs] class SnsClient(AwsClient): """ Client for AWS Simple Notification Service (SNS). This client provides methods for publishing messages to SNS topics, endpoints, and phone numbers. Supports both single and batch operations. Example: .. code-block:: python # Initialize client sns = SnsClient(region="us-east-1") # Publish to topic msg = SnsMessage(Message="Hello, SNS!", Subject="Notification") sns.publish_message(msg, topic_arn="arn:aws:sns:us-east-1:123:my-topic") # Publish multiple messages messages = [ SnsMessage(Message="Message 1", Id="msg-1"), SnsMessage(Message="Message 2", Id="msg-2") ] sns.publish_batch("arn:aws:sns:us-east-1:123:my-topic", messages) .. """ client: "mypy_boto3_sns.client.SNSClient" # type: ignore[name-defined] # noqa: F821 batch_size: int
[docs] def __init__( self, region: str, batch_size: int = 10, **kwargs: Any ) -> None: """ Initialize the SNS client. :param region: AWS region name (e.g., 'us-east-1', 'eu-west-1'). :param batch_size: Maximum messages per batch (default: 10, max: 10). :param kwargs: Additional arguments passed to boto3.client(). """ super().__init__("sns", region_name=region, **kwargs) self.batch_size = batch_size
[docs] def create_topic( self, name: str, attributes: Optional[Dict[str, str]] = None, tags: Optional[List[Dict[str, str]]] = None, data_protection_policy: Optional[str] = None, ) -> str: """ Create a new SNS topic. Creates a standard or FIFO topic based on name suffix (.fifo for FIFO topics). Returns the topic ARN for publishing messages and subscribing endpoints. Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/create_topic.html :param name: Topic name. Must be 1-256 characters, alphanumeric plus hyphens and underscores. For FIFO topics, name must end with ".fifo" suffix. :param attributes: Optional topic configuration attributes. Common attributes: - **DeliveryPolicy**: JSON string for delivery retry policy - **DisplayName**: Human-readable name for SMS sender (max 100 chars) - **FifoTopic**: "true" for FIFO topic (auto-set if name ends with .fifo) - **ContentBasedDeduplication**: "true" to enable content-based deduplication (FIFO only) - **KmsMasterKeyId**: AWS KMS key ID for encryption - **Policy**: JSON string for topic access policy :param tags: Optional list of tags to apply to the topic. Each tag is a dict with 'Key' and 'Value' fields. Maximum 50 tags per topic. Example: [{"Key": "Environment", "Value": "Production"}] :param data_protection_policy: Optional JSON string defining data protection policy for sensitive data scanning and redaction. :return: The ARN (Amazon Resource Name) of the created topic. :raises AwsClientException: If topic creation fails. Example: .. code-block:: python sns = SnsClient(region="us-east-1") # Create standard topic topic_arn = sns.create_topic(name="my-notifications") print(f"Created topic: {topic_arn}") # Create FIFO topic with attributes fifo_arn = sns.create_topic( name="my-orders.fifo", attributes={ "FifoTopic": "true", "ContentBasedDeduplication": "true" }, tags=[ {"Key": "Environment", "Value": "Production"}, {"Key": "Application", "Value": "OrderProcessing"} ] ) # Create topic with KMS encryption encrypted_arn = sns.create_topic( name="secure-topic", attributes={ "KmsMasterKeyId": "alias/aws/sns", "DisplayName": "Secure Notifications" } ) .. Note: - Topic names are case-sensitive - Creating a topic with existing name is idempotent (returns existing ARN) - FIFO topics support message ordering and deduplication - Standard topics offer best-effort ordering and at-least-once delivery """ kwargs: Dict[str, Any] = {"Name": name} if attributes: kwargs["Attributes"] = attributes if tags: kwargs["Tags"] = tags if data_protection_policy: kwargs["DataProtectionPolicy"] = data_protection_policy try: response = self.client.create_topic(**kwargs) return response["TopicArn"] except ClientError as error: raise AwsClientException(error) from error
[docs] def subscribe_sqs_queue( self, topic_arn: str, queue_name: str, region: Optional[str] = None, set_queue_policy: bool = True, attributes: Optional[Dict[str, str]] = None, ) -> str: """ Subscribe an SQS queue to an SNS topic by queue name. This is a convenience method that handles the entire subscription process, including setting up the necessary queue policy to allow SNS to send messages. Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/subscribe.html :param topic_arn: ARN of the SNS topic to subscribe the queue to. :param queue_name: Name of the SQS queue to subscribe. The queue must exist in the same region as the SNS client or in the specified region. :param region: Optional AWS region where the SQS queue exists. If not specified, uses the same region as the SNS client. :param set_queue_policy: If True (default), automatically sets the SQS queue policy to allow the SNS topic to send messages. Set to False if you want to manage the queue policy manually. :param attributes: Optional subscription attributes. Common attributes: - **RawMessageDelivery**: "true" to deliver raw messages without SNS envelope (default: "false") - **FilterPolicy**: JSON string for message filtering based on attributes - **RedrivePolicy**: JSON string defining dead-letter queue for failed deliveries :return: The subscription ARN for the created subscription. :raises AwsClientException: If subscription fails or queue doesn't exist. Example: .. code-block:: python from core_aws.services.sns.client import SnsClient from core_aws.services.sqs.client import SqsClient sns = SnsClient(region="us-east-1") sqs = SqsClient(region="us-east-1") # Create topic and queue topic_arn = sns.create_topic(name="notifications") queue_url = sqs.create_queue(queue_name="notification-queue") # Subscribe queue to topic (automatically sets queue policy) subscription_arn = sns.subscribe_sqs_queue( topic_arn=topic_arn, queue_name="notification-queue" ) print(f"Subscribed: {subscription_arn}") # Subscribe with raw message delivery subscription_arn = sns.subscribe_sqs_queue( topic_arn=topic_arn, queue_name="notification-queue", attributes={ "RawMessageDelivery": "true" } ) # Subscribe with message filtering subscription_arn = sns.subscribe_sqs_queue( topic_arn=topic_arn, queue_name="notification-queue", attributes={ "FilterPolicy": json.dumps({ "event_type": ["order_placed", "order_shipped"] }) } ) .. Note: - The SQS queue must exist before calling this method - By default, sets queue policy to allow SNS to send messages - Subscription is confirmed automatically for SQS endpoints - For cross-region subscriptions, specify the queue's region """ try: # Initialize SQS client in the appropriate region queue_region = region or self.client.meta.region_name sqs_client = SqsClient(region=queue_region) # Get queue attributes to retrieve ARN queue = sqs_client.get_queue_by_name(queue_name) queue_arn = queue.attributes.get("QueueArn") queue_url = queue.url if not queue_arn: raise AwsClientException( f"Could not retrieve ARN for queue: {queue_name}!" ) # Set queue policy to allow SNS to send messages (if requested) if set_queue_policy: policy = { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "sns.amazonaws.com"}, "Action": "sqs:SendMessage", "Resource": queue_arn, "Condition": { "ArnEquals": {"aws:SourceArn": topic_arn} } }] } sqs_client.client.set_queue_attributes( QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)} ) # Subscribe the queue to the topic subscribe_kwargs: Dict[str, Any] = { "TopicArn": topic_arn, "Protocol": "sqs", "Endpoint": queue_arn } if attributes: subscribe_kwargs["Attributes"] = attributes response = self.client.subscribe(**subscribe_kwargs) return response["SubscriptionArn"] except ClientError as error: raise AwsClientException(error) from error
[docs] def publish_message( self, message: SnsMessage, topic_arn: Optional[str] = None, target_arn: Optional[str] = None, phone_number: Optional[str] = None, ) -> Dict[str, Any]: """ Publish a message to an SNS topic, mobile endpoint, or phone number. Sends a message to one of three destinations: an SNS topic (fan-out), a mobile platform endpoint (push notification), or a phone number (SMS). Exactly one destination must be specified. Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/publish.html :param message: SnsMessage object containing the message and metadata. :param topic_arn: ARN of the SNS topic to publish to. Required if target_arn and phone_number are not specified. :param target_arn: ARN of mobile platform endpoint or device. Required if topic_arn and phone_number are not specified. :param phone_number: Phone number in E.164 format (e.g., +14155551234). Required if topic_arn and target_arn are not specified. :return: Dictionary containing: - **MessageId** (str): Unique identifier for the published message. - **SequenceNumber** (str, optional): For FIFO topics only. :raises AwsClientException: If no destination specified or if publishing fails. Example: .. code-block:: python sns = SnsClient(region="us-east-1") # Publish to topic msg = SnsMessage(Message="Hello!", Subject="Notification") response = sns.publish_message( msg, topic_arn="arn:aws:sns:us-east-1:123456789012:my-topic" ) print(f"Published: {response['MessageId']}") # Send SMS sms_msg = SnsMessage(Message="Your code is: 123456") sns.publish_message(sms_msg, phone_number="+14155551234") # Publish to mobile endpoint push_msg = SnsMessage(Message="New message!") sns.publish_message( push_msg, target_arn="arn:aws:sns:us-east-1:123:endpoint/APNS/MyApp/abc123" ) .. """ if not any([topic_arn, target_arn, phone_number]): raise AwsClientException( "You must specify one of: topic_arn, target_arn, or phone_number" ) kwargs = {k: v for k, v in message.as_dict().items() if k != "Id"} for key, value in (("TargetArn", target_arn), ("TopicArn", topic_arn), ("PhoneNumber", phone_number)): if value: kwargs[key] = value try: return self.client.publish(**kwargs) except ClientError as error: raise AwsClientException(error) from error
[docs] def publish_batch( self, topic_arn: str, messages: List[SnsMessage] ) -> Dict[str, List[Dict[str, Any]]]: """ Publish multiple messages to an SNS topic in batches. Publishes up to 10 messages per API call. For FIFO topics, messages within a batch are published in order and deduplicated within/across batches for 5 minutes. Automatically handles larger message lists by batching into chunks of `batch_size`. Reference: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/publish_batch.html :param topic_arn: ARN of the SNS topic to publish to. :param messages: List of SnsMessage objects to publish. Each message must have a unique `Id` field within the batch. :return: Dictionary containing aggregated results from all batches: .. code-block:: python { "Successful": [ { "Id": "string", "MessageId": "string", "SequenceNumber": "string" # FIFO only } ], "Failed": [ { "Id": "string", "Code": "string", "Message": "string", "SenderFault": True | False } ] } .. :raises AwsClientException: If batch publishing fails. Example: .. code-block:: python sns = SnsClient(region="us-east-1") # Create messages with unique IDs messages = [ SnsMessage(Message="First message", Id="msg-1"), SnsMessage(Message="Second message", Id="msg-2"), SnsMessage(Message="Third message", Id="msg-3") ] # Publish batch result = sns.publish_batch( topic_arn="arn:aws:sns:us-east-1:123456789012:my-topic", messages=messages ) print(f"Successful: {len(result['Successful'])}") print(f"Failed: {len(result['Failed'])}") # Check failures for failure in result.get('Failed', []): print(f"Failed {failure['Id']}: {failure['Message']}") .. Note: - Each message must have a unique `Id` field - Max 10 messages per batch (enforced by batch_size) - For FIFO topics, MessageGroupId is required - Total payload size per batch must be < 256 KB """ result = defaultdict(list) try: for batch in get_batches(messages, self.batch_size): response = self.client.publish_batch( TopicArn=topic_arn, PublishBatchRequestEntries=[x.as_dict() for x in batch]) for key, value in response.items(): result[key].extend(value) return dict(result) except ClientError as error: raise AwsClientException(error) from error