Services#
Service Clients#
Base Client#
Base classes for AWS service clients.
This module provides the foundational classes for all AWS service client wrappers in the core-aws library.
- class core_aws.services.base.AwsClient(service: str, **kwargs: Any)[source]#
Bases:
objectBase class for all AWS service client wrappers.
This class provides a common interface for creating boto3 clients with consistent error handling and initialization patterns. All service-specific client classes should inherit from this base class.
- Parameters:
client – The underlying boto3 client instance for the AWS service.
Example
class MyServiceClient(AwsClient): def __init__(self, region: str = "us-east-1", **kwargs): super().__init__( service="my_service", region_name=region, **kwargs ) def my_operation(self): return self.client.some_operation()
- __init__(service: str, **kwargs: Any) None[source]#
Initialize an AWS service client. Creates a boto3 client for the specified AWS service with the provided configuration options.
- Parameters:
service – AWS service name (e.g., ‘s3’, ‘sqs’, ‘ssm’, ‘lambda’).
kwargs –
- Additional arguments passed to boto3.client():
region_name: AWS region (e.g., ‘us-east-1’).
aws_access_key_id: AWS access key ID.
aws_secret_access_key: AWS secret access key.
aws_session_token: AWS session token.
endpoint_url: Custom endpoint URL (for LocalStack, etc.).
config: botocore.client.Config instance.
Any other boto3.client() parameters.
- Raises:
AwsClientException – If client creation fails.
Example
# Standard usage client = AwsClient("s3", region_name="us-east-1") # With custom endpoint (LocalStack) client = AwsClient( "sqs", endpoint_url="http://localhost:4566", region_name="us-east-1" )
- client: BaseClient#
- exception core_aws.services.base.AwsClientException[source]#
Bases:
ExceptionCustom exception for AWS client operations. This exception is raised when AWS client operations fail. It can wrap the original exception or provide a custom error message.
- Usage:
# Simple pass-through (preserves stack trace with 'from') try: client.some_operation() except Exception as error: raise AwsClientException(error) from error # With custom message try: client.some_operation() except Exception as error: raise AwsClientException(f"Failed to do X: {error}") from error # Custom message only (no original exception) raise AwsClientException("Operation failed: custom reason")
CloudFormation Client#
AWS CloudFormation client wrapper.
This module provides a high-level interface for interacting with AWS CloudFormation, including stack description and output value retrieval operations.
- class core_aws.services.cloud_formation.client.CloudFormationClient(region: str, **kwargs: Any)[source]#
Bases:
AwsClientClient for AWS CloudFormation.
This client provides methods for interacting with CloudFormation stacks, including retrieving stack information and extracting output values. Simplifies common stack inspection operations with automatic error handling.
Example
# Initialize client cfn = CloudFormationClient(region="us-east-1") # Get stack details stack = cfn.describe_stack(stack_name="my-app-stack") print(f"Stack Status: {stack['StackStatus']}") print(f"Stack Outputs: {stack['Outputs']}") # Get specific output value by export name vpc_id = cfn.get_output_value( stack_name="my-app-stack", export_name="MyAppVpcId" ) if vpc_id: print(f"VPC ID: {vpc_id}")
- client: mypy_boto3_cloudformation.client.CloudFormationClient#
- __init__(region: str, **kwargs: Any) None[source]#
Initialize the CloudFormation client.
- Parameters:
region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).
kwargs – Additional arguments passed to boto3.client().
- describe_stack(stack_name: str) Dict[str, Any][source]#
Retrieve detailed information about a CloudFormation stack. Returns comprehensive stack details including status, parameters, outputs, tags, and capabilities. This is a convenience wrapper around describe_stacks that returns only the first stack.
- Parameters:
stack_name – Name or ARN of the CloudFormation stack. Example: “my-app-stack” or full ARN.
- Returns:
Dictionary containing stack details:
{ "StackId": "arn:aws:cloudformation:...", "StackName": "my-app-stack", "Description": "My application infrastructure", "Parameters": [ { "ParameterKey": "InstanceType", "ParameterValue": "t3.micro" } ], "CreationTime": datetime(2024, 1, 1, 0, 0, 0), "LastUpdatedTime": datetime(2024, 6, 1, 0, 0, 0), "StackStatus": "CREATE_COMPLETE", # or UPDATE_COMPLETE, etc. "StackStatusReason": "string", "Outputs": [ { "OutputKey": "VpcId", "OutputValue": "vpc-12345678", "Description": "VPC ID", "ExportName": "MyAppVpcId" } ], "Tags": [ { "Key": "Environment", "Value": "production" } ], "Capabilities": ["CAPABILITY_IAM"], "DriftInformation": { "StackDriftStatus": "IN_SYNC" | "DRIFTED" | "NOT_CHECKED" } }
- Raises:
AwsClientException – If stack doesn’t exist or operation fails.
Example
cfn = CloudFormationClient(region="us-east-1") # Get stack details stack = cfn.describe_stack(stack_name="my-app-stack") print(f"Stack: {stack['StackName']}") print(f"Status: {stack['StackStatus']}") print(f"Created: {stack['CreationTime']}") # Check stack parameters for param in stack.get("Parameters", []): print(f" {param['ParameterKey']}: {param['ParameterValue']}") # Check stack outputs for output in stack.get("Outputs", []): print(f" {output['OutputKey']}: {output['OutputValue']}") # Check stack status before operations if stack["StackStatus"] in ["CREATE_COMPLETE", "UPDATE_COMPLETE"]: print("Stack is stable") elif "IN_PROGRESS" in stack["StackStatus"]: print("Stack operation in progress") elif "FAILED" in stack["StackStatus"]: print(f"Stack failed: {stack.get('StackStatusReason', 'Unknown')}")
- get_output_value(stack_name: str, export_name: str) str | None[source]#
Retrieve a specific output value from a CloudFormation stack by export name.
Searches through stack outputs to find the value associated with the specified export name. Returns None if the export name is not found. This is useful for retrieving exported resource identifiers (VPC IDs, security group IDs, etc.) from stacks.
- Parameters:
stack_name – Name or ARN of the CloudFormation stack.
export_name – Export name of the output value to retrieve. This is the “ExportName” field in the stack outputs, not the “OutputKey”.
- Returns:
Output value as string if found, None if export name doesn’t exist.
- Raises:
AwsClientException – If stack doesn’t exist or operation fails.
Example
cfn = CloudFormationClient(region="us-east-1") # Get specific output by export name vpc_id = cfn.get_output_value( stack_name="network-stack", export_name="MyAppVpcId" ) if vpc_id: print(f"VPC ID: {vpc_id}") else: print("Export 'MyAppVpcId' not found in stack outputs") # Get multiple outputs exports_to_fetch = [ "MyAppVpcId", "MyAppSubnetId", "MyAppSecurityGroupId" ] for export in exports_to_fetch: value = cfn.get_output_value("network-stack", export) if value: print(f"{export}: {value}") else: print(f"{export}: Not found") # Use output value in another operation db_endpoint = cfn.get_output_value( stack_name="database-stack", export_name="DatabaseEndpoint" ) if db_endpoint: # Use the endpoint in your application connection_string = f"postgresql://{db_endpoint}:5432/mydb"
Note
Export names must be unique within a region
This method searches by ExportName, not OutputKey
Returns None (not an error) if export name is not found
Automatically handles stacks without Outputs section
DynamoDB Client#
AWS DynamoDB client wrapper.
This module provides a high-level interface for interacting with AWS DynamoDB, including item operations like get_item and update_item with automatic error handling.
- class core_aws.services.dynamo.client.DynamoDbClient(region: str, **kwargs: Any)[source]#
Bases:
AwsClientClient for AWS DynamoDB.
This client provides methods for interacting with DynamoDB tables, including retrieving and updating items. Supports both single-item operations and conditional updates with automatic error handling.
Example
# Initialize client dynamodb = DynamoDbClient(region="us-east-1") # Get item by primary key response = dynamodb.get_item( table="Users", key={"userId": {"S": "user123"}} ) if "Item" in response: print(f"User: {response['Item']}") # Update item with conditional expression dynamodb.update_item( table="Users", key={"userId": {"S": "user123"}}, update_expression="SET #name = :name, lastLogin = :timestamp", expression_attribute_values={ ":name": {"S": "John Doe"}, ":timestamp": {"N": "1609459200"} }, ExpressionAttributeNames={"#name": "name"} )
- client: mypy_boto3_dynamodb.client.DynamoDBClient#
- __init__(region: str, **kwargs: Any) None[source]#
Initialize the DynamoDB client.
- Parameters:
region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).
kwargs – Additional arguments passed to boto3.client().
- get_item(table: str, key: Dict[str, Any], **kwargs: Any) Dict[str, Any][source]#
Retrieve an item from a DynamoDB table by primary key. Returns all attributes for the item with the specified primary key. If no matching item exists, the response will not contain an “Item” field. Uses eventually consistent reads by default.
- Parameters:
table – Name of the DynamoDB table.
key –
Primary key of the item to retrieve. Must include partition key and sort key (if table has one). Format:
# Table with partition key only {"userId": {"S": "user123"}} # Table with partition + sort key {"userId": {"S": "user123"}, "timestamp": {"N": "1609459200"}}
kwargs –
Additional boto3 parameters:
- ConsistentRead (bool):
Use strongly consistent read instead of eventually consistent. Default: False.
- ProjectionExpression (str):
Comma-separated list of attributes to retrieve. Example: “username, email, lastLogin”
- ExpressionAttributeNames (dict):
Substitution tokens for attribute names in expressions. Example: {“#n”: “name”} (use when attribute is reserved word)
- ReturnConsumedCapacity (str):
Return capacity info: “INDEXES”, “TOTAL”, or “NONE”.
- Returns:
Dictionary containing item data (if found) and metadata:
{ "Item": { # Present only if item exists "userId": {"S": "user123"}, "username": {"S": "john_doe"}, "email": {"S": "john@example.com"}, "loginCount": {"N": "42"}, "tags": {"SS": ["premium", "verified"]}, "metadata": { "M": { "created": {"N": "1609459200"}, "updated": {"N": "1640995200"} } }, "active": {"BOOL": True} }, "ConsumedCapacity": { # If ReturnConsumedCapacity specified "TableName": "Users", "CapacityUnits": 0.5 } }
- Raises:
AwsClientException – If the operation fails.
Example
dynamodb = DynamoDbClient(region="us-east-1") # Get item by partition key response = dynamodb.get_item( table="Users", key={"userId": {"S": "user123"}} ) if "Item" in response: user = response["Item"] print(f"Username: {user['username']['S']}") else: print("User not found") # Get item with consistent read response = dynamodb.get_item( table="Users", key={"userId": {"S": "user123"}}, ConsistentRead=True ) # Get specific attributes only response = dynamodb.get_item( table="Users", key={"userId": {"S": "user123"}}, ProjectionExpression="username, email, lastLogin" ) # Get item from table with partition + sort key response = dynamodb.get_item( table="OrderHistory", key={ "userId": {"S": "user123"}, "orderId": {"S": "order-456"} } ) # Use attribute name substitution (when attribute is reserved word) response = dynamodb.get_item( table="Users", key={"userId": {"S": "user123"}}, ProjectionExpression="#n, email", ExpressionAttributeNames={"#n": "name"} )
- update_item(table: str, key: Dict[str, Any], expression_attribute_values: Dict[str, Any], update_expression: str, **kwargs: Any) Dict[str, Any][source]#
Update an existing item’s attributes in a DynamoDB table.
Modifies an existing item or creates a new item if it doesn’t exist. Supports SET, REMOVE, ADD, and DELETE operations on attributes. Can perform conditional updates and return old/new attribute values.
- Parameters:
table – Name of the DynamoDB table.
key – Primary key of the item to update (partition key + sort key if applicable). Example: {“userId”: {“S”: “user123”}}
expression_attribute_values – Values to substitute in the update expression. Must be prefixed with colon (:). Example: {“:name”: {“S”: “John”}, “:count”: {“N”: “1”}}
update_expression –
- Expression defining how to update the item. Supports:
SET: Set attribute value
REMOVE: Remove attribute
ADD: Increment number or add to set
DELETE: Remove from set
Example: “SET #name = :name, loginCount = loginCount + :count”
kwargs –
Additional boto3 parameters:
- ConditionExpression (str):
Condition that must be true for update to proceed. Example: “attribute_exists(userId)” or “version = :oldVersion”
- ExpressionAttributeNames (dict):
Substitution tokens for attribute names (use for reserved words). Example: {“#name”: “name”, “#status”: “status”}
- ReturnValues (str):
What values to return: “NONE” (default), “ALL_OLD”, “UPDATED_OLD”, “ALL_NEW”, “UPDATED_NEW”.
- ReturnConsumedCapacity (str):
Return capacity info: “INDEXES”, “TOTAL”, or “NONE”.
- ReturnItemCollectionMetrics (str):
Return collection metrics: “SIZE” or “NONE”.
- Returns:
Dictionary containing updated attributes (based on ReturnValues):
{ "Attributes": { # Present based on ReturnValues "userId": {"S": "user123"}, "username": {"S": "john_doe"}, "loginCount": {"N": "43"}, "lastLogin": {"N": "1640995200"} }, "ConsumedCapacity": { # If ReturnConsumedCapacity specified "TableName": "Users", "CapacityUnits": 1.0 } }
- Raises:
AwsClientException – If the operation fails or condition is not met.
Example
dynamodb = DynamoDbClient(region="us-east-1") # Simple SET operation dynamodb.update_item( table="Users", key={"userId": {"S": "user123"}}, update_expression="SET lastLogin = :timestamp", expression_attribute_values={ ":timestamp": {"N": "1640995200"} } ) # Multiple operations with attribute name substitution dynamodb.update_item( table="Users", key={"userId": {"S": "user123"}}, update_expression="SET #name = :name, email = :email, loginCount = loginCount + :inc", expression_attribute_values={ ":name": {"S": "John Doe"}, ":email": {"S": "john@example.com"}, ":inc": {"N": "1"} }, ExpressionAttributeNames={ "#name": "name" # 'name' is a reserved word } ) # Conditional update with return values response = dynamodb.update_item( table="Users", key={"userId": {"S": "user123"}}, update_expression="SET accountBalance = accountBalance - :amount", expression_attribute_values={ ":amount": {"N": "50"}, ":min": {"N": "0"} }, ConditionExpression="accountBalance >= :min", ReturnValues="ALL_NEW" ) print(f"New balance: {response['Attributes']['accountBalance']['N']}") # REMOVE operation dynamodb.update_item( table="Users", key={"userId": {"S": "user123"}}, update_expression="REMOVE temporaryToken", expression_attribute_values={} ) # ADD to set dynamodb.update_item( table="Users", key={"userId": {"S": "user123"}}, update_expression="ADD tags :newTag", expression_attribute_values={ ":newTag": {"SS": ["premium"]} } ) # Complex update with multiple clauses dynamodb.update_item( table="Users", key={"userId": {"S": "user123"}}, update_expression="SET #status = :active, updatedAt = :now REMOVE oldField ADD loginCount :one", expression_attribute_values={ ":active": {"S": "ACTIVE"}, ":now": {"N": "1640995200"}, ":one": {"N": "1"} }, ExpressionAttributeNames={ "#status": "status" }, ReturnValues="UPDATED_NEW" )
ECS Client#
AWS ECS (Elastic Container Service) client wrapper.
This module provides a high-level interface for interacting with AWS ECS, including service management, task operations, and container orchestration.
- class core_aws.services.ecs.client.EcsClient(region: str, **kwargs: Any)[source]#
Bases:
AwsClientClient for AWS ECS (Elastic Container Service).
This client provides methods for managing ECS services, tasks, and container instances. Supports service updates, task listing, and service description operations with automatic error handling.
Example
# Initialize client ecs = EcsClient(region="us-east-1") # List services in a cluster services = ecs.list_services(cluster="my-cluster") print(f"Services: {services['serviceArns']}") # Describe services details = ecs.describe_services( cluster="my-cluster", services=["my-service"] ) # Update service desired count ecs.update_service( service="my-service", cluster="my-cluster", desiredCount=3 ) # List running tasks tasks = ecs.list_tasks( cluster="my-cluster", serviceName="my-service" )
- client: mypy_boto3_ecs.client.ECSClient#
- __init__(region: str, **kwargs: Any) None[source]#
Initialize the ECS client.
- Parameters:
region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).
kwargs – Additional arguments passed to boto3.client().
- list_services(cluster: str, **kwargs: Any) Dict[str, Any][source]#
List services in an ECS cluster. Returns a list of service ARNs in the specified cluster. Results can be filtered by launch type and scheduling strategy. Supports pagination for clusters with many services.
- Parameters:
cluster – Short name or full ARN of the cluster. If not specified, uses the default cluster.
kwargs –
Additional boto3 parameters:
- nextToken (str):
Token for pagination. Use the value from a previous response to get the next page of results.
- maxResults (int):
Maximum number of results per page (1-100). Default: 10.
- launchType (str):
Filter by launch type: ‘EC2’, ‘FARGATE’, or ‘EXTERNAL’.
- schedulingStrategy (str):
Filter by scheduling strategy: ‘REPLICA’ or ‘DAEMON’.
- Returns:
Dictionary containing service ARNs and pagination info:
{ "serviceArns": [ "arn:aws:ecs:us-west-2:123456789012:service/my-cluster/my-service" ], "nextToken": "string", # Present if more results available "ResponseMetadata": {...} }
- Raises:
AwsClientException – If the operation fails.
Example
ecs = EcsClient(region="us-east-1") # List all services in cluster services = ecs.list_services(cluster="my-cluster") print(f"Found {len(services['serviceArns'])} services") # List Fargate services with pagination services = ecs.list_services( cluster="my-cluster", launchType="FARGATE", maxResults=50 ) # Handle pagination all_service_arns = [] response = ecs.list_services(cluster="my-cluster") all_service_arns.extend(response["serviceArns"]) while "nextToken" in response: response = ecs.list_services( cluster="my-cluster", nextToken=response["nextToken"] ) all_service_arns.extend(response["serviceArns"])
- describe_services(cluster: str, services: List[str], **kwargs: Any) Dict[str, Any][source]#
Retrieve detailed information about ECS services. Returns comprehensive information about specified services including configuration, deployment status, task definition, load balancers, service registries, and more. Up to 10 services can be described in a single operation.
- Parameters:
cluster – Short name or full ARN of the cluster hosting the services. Required if services are in non-default cluster.
services – List of service names or ARNs to describe (max 10 per request). Example: [“my-service”, “another-service”]
kwargs –
Additional boto3 parameters:
- include (list):
Additional information to include in response. Options: [‘TAGS’] to include resource tags.
- Returns:
Dictionary containing service details:
{ "services": [ { "serviceName": "my-service", "serviceArn": "arn:aws:ecs:...", "clusterArn": "arn:aws:ecs:...", "status": "ACTIVE", "desiredCount": 2, "runningCount": 2, "pendingCount": 0, "launchType": "FARGATE", "taskDefinition": "arn:aws:ecs:...", "deployments": [...], "events": [...], "loadBalancers": [...], "networkConfiguration": {...}, "tags": [...] # If include=['TAGS'] } ], "failures": [ { "arn": "arn:aws:ecs:...", "reason": "MISSING" } ] }
- Raises:
AwsClientException – If the operation fails.
Example
ecs = EcsClient(region="us-east-1") # Describe single service details = ecs.describe_services( cluster="my-cluster", services=["my-service"] ) service = details["services"][0] print(f"Service: {service['serviceName']}") print(f"Running: {service['runningCount']}/{service['desiredCount']}") print(f"Status: {service['status']}") # Describe multiple services with tags details = ecs.describe_services( cluster="my-cluster", services=["service-1", "service-2", "service-3"], include=["TAGS"] ) for service in details["services"]: print(f"{service['serviceName']}: {service['taskDefinition']}") for tag in service.get("tags", []): print(f" {tag['key']}: {tag['value']}") # Check for failures if details["failures"]: for failure in details["failures"]: print(f"Failed: {failure['arn']} - {failure['reason']}")
- update_service(service: str, **kwargs: Any) Dict[str, Any][source]#
Update an ECS service configuration. Modifies service parameters including desired count, task definition, deployment configuration, network configuration, and task placement strategies. For services using rolling update (ECS) deployment controller.
- Parameters:
service – Name or full ARN of the service to update.
kwargs –
Additional boto3 parameters (commonly used):
- cluster (str):
Cluster name or ARN. Default: default cluster.
- desiredCount (int):
Number of task instantiations to place and keep running.
- taskDefinition (str):
Task definition family and revision (family:revision) or full ARN.
- deploymentConfiguration (dict):
- Deployment parameters:
minimumHealthyPercent (int): Lower limit (0-100)
maximumPercent (int): Upper limit (100-200)
deploymentCircuitBreaker (dict): Circuit breaker config
- networkConfiguration (dict):
Network configuration for FARGATE launch type.
- platformVersion (str):
Platform version for Fargate tasks (e.g., “LATEST”, “1.4.0”).
- forceNewDeployment (bool):
Force new deployment even if no changes.
- healthCheckGracePeriodSeconds (int):
Grace period for load balancer health checks (0-2147483647).
- enableExecuteCommand (bool):
Enable ECS Exec for debugging.
- capacityProviderStrategy (list):
Capacity provider strategy to use.
- placementConstraints (list):
Task placement constraints.
- placementStrategy (list):
Task placement strategies.
- Returns:
Dictionary containing updated service information:
{ "service": { "serviceName": "my-service", "serviceArn": "arn:aws:ecs:...", "taskDefinition": "arn:aws:ecs:...", "desiredCount": 3, "runningCount": 2, "pendingCount": 1, "deployments": [ { "status": "PRIMARY", "taskDefinition": "arn:aws:ecs:...", "desiredCount": 3, "runningCount": 2 } ], "events": [...] } }
- Raises:
AwsClientException – If the operation fails.
Example
ecs = EcsClient(region="us-east-1") # Update desired count result = ecs.update_service( service="my-service", cluster="my-cluster", desiredCount=5 ) print(f"Updated to {result['service']['desiredCount']} tasks") # Update task definition ecs.update_service( service="my-service", cluster="my-cluster", taskDefinition="my-task:2" ) # Force new deployment with deployment configuration ecs.update_service( service="my-service", cluster="my-cluster", forceNewDeployment=True, deploymentConfiguration={ "minimumHealthyPercent": 50, "maximumPercent": 200, "deploymentCircuitBreaker": { "enable": True, "rollback": True } } ) # Enable ECS Exec for debugging ecs.update_service( service="my-service", cluster="my-cluster", enableExecuteCommand=True )
- list_tasks(**kwargs: Any) Dict[str, Any][source]#
List tasks in an ECS cluster. Returns a list of task ARNs. Filter by cluster, task definition family, container instance, launch type, starter principal, or desired status. Recently stopped tasks appear in results for at least one hour.
- Parameters:
kwargs –
Boto3 parameters for filtering:
- cluster (str):
Cluster name or ARN. Default: default cluster.
- containerInstance (str):
Container instance ID or ARN to filter by.
- family (str):
Task definition family name to filter by.
- serviceName (str):
Service name to filter tasks belonging to that service.
- desiredStatus (str):
Filter by task status: ‘RUNNING’ (default) or ‘STOPPED’. Use ‘STOPPED’ for debugging failed/finished tasks.
- launchType (str):
Filter by launch type: ‘EC2’, ‘FARGATE’, or ‘EXTERNAL’.
- startedBy (str):
Filter by principal that started the task.
- nextToken (str):
Pagination token from previous response.
- maxResults (int):
Maximum results per page (1-100). Default: 100.
- Returns:
Dictionary containing task ARNs and pagination info:
{ "taskArns": [ "arn:aws:ecs:us-west-2:123456789012:task/my-cluster/abc123", "arn:aws:ecs:us-west-2:123456789012:task/my-cluster/def456" ], "nextToken": "string", # Present if more results available "ResponseMetadata": {...} }
- Raises:
AwsClientException – If the operation fails.
Example
ecs = EcsClient(region="us-east-1") # List all running tasks in cluster tasks = ecs.list_tasks(cluster="my-cluster") print(f"Running tasks: {len(tasks['taskArns'])}") # List tasks for specific service service_tasks = ecs.list_tasks( cluster="my-cluster", serviceName="my-service" ) # List stopped tasks (for debugging) stopped_tasks = ecs.list_tasks( cluster="my-cluster", desiredStatus="STOPPED", maxResults=50 ) # List tasks by task definition family family_tasks = ecs.list_tasks( cluster="my-cluster", family="my-task-family" ) # List Fargate tasks started by specific principal fargate_tasks = ecs.list_tasks( cluster="my-cluster", launchType="FARGATE", startedBy="arn:aws:iam::123456789012:user/admin" ) # Handle pagination all_task_arns = [] response = ecs.list_tasks(cluster="my-cluster") all_task_arns.extend(response["taskArns"]) while "nextToken" in response: response = ecs.list_tasks( cluster="my-cluster", nextToken=response["nextToken"] ) all_task_arns.extend(response["taskArns"]) print(f"Total tasks: {len(all_task_arns)}")
Kinesis Client#
AWS Kinesis Data Streams client wrapper.
This module provides a high-level interface for interacting with AWS Kinesis, including single and batch record operations with automatic retry logic.
- class core_aws.services.kinesis.client.KinesisClient(region: str, **kwargs: Any)[source]#
Bases:
AwsClientClient for AWS Kinesis Data Streams.
This client provides methods for writing records to Kinesis data streams, supporting both single and batch operations. Includes intelligent retry logic for handling transient failures and throughput exceeded errors.
Example
# Initialize client kinesis = KinesisClient(region="us-east-1") # Put single record response = kinesis.put_record( stream_name="my-stream", data=b'{"event": "user_signup", "user_id": 123}', partition_key="user-123" ) print(f"Shard: {response['ShardId']}") # Put multiple records with retry records = [ {"event": "login", "user_id": 1}, {"event": "purchase", "user_id": 2} ] kinesis.send_records( records=records, stream_name="my-stream", partition_key="events" )
- client: mypy_boto3_kinesis.client.KinesisClient#
- __init__(region: str, **kwargs: Any) None[source]#
Initialize the Kinesis client.
- Parameters:
region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).
kwargs – Additional arguments passed to boto3.client().
- create_stream(stream_name: str, shard_count: int | None = None, stream_mode_details: Dict[str, str] | None = None, **kwargs: Any) Dict[str, Any][source]#
Create a new Kinesis data stream with specified capacity. Creates either a provisioned stream (with explicit shard count) or an on-demand stream (auto-scaling). Stream becomes ACTIVE within seconds and can start accepting data immediately.
- Parameters:
stream_name – Name of the stream to create. Must be: - 1-128 characters long - Alphanumeric, hyphens, underscores, and periods only - Cannot use “aws:” prefix (reserved) - Unique within the AWS account and region
shard_count –
Number of shards for provisioned capacity mode. Each shard provides: - Write: 1 MB/sec or 1,000 records/sec - Read: 2 MB/sec or 5 read transactions/sec
Required for provisioned mode, must be None for on-demand mode. Can be increased/decreased later using UpdateShardCount API.
stream_mode_details –
Stream capacity mode configuration. Dict with single key:
- StreamMode (str): Capacity mode, either:
”PROVISIONED”: Fixed shard count (requires shard_count parameter)
”ON_DEMAND”: Auto-scaling capacity (shard_count must be None)
- Default behavior if omitted:
With shard_count: Creates provisioned stream
Without shard_count: Creates on-demand stream
kwargs –
Additional boto3 parameters:
Tags (dict): Resource tags for the stream. Dictionary of key-value pairs: {“Environment”: “Production”, “Application”: “Analytics”}
- Returns:
Empty dictionary on success. Stream creation is asynchronous. Use describe_stream() or wait_until_active() to check status.
- Raises:
AwsClientException – If stream creation fails (e.g., name already exists, invalid parameters, resource limits exceeded).
Example
kinesis = KinesisClient(region="us-east-1") # Create provisioned stream with 2 shards kinesis.create_stream( stream_name="my-stream", shard_count=2 ) # Create on-demand stream (auto-scaling) kinesis.create_stream( stream_name="my-on-demand-stream", stream_mode_details={"StreamMode": "ON_DEMAND"} ) # Create stream with explicit mode specification kinesis.create_stream( stream_name="my-provisioned-stream", shard_count=5, stream_mode_details={"StreamMode": "PROVISIONED"} ) # Create stream with tags kinesis.create_stream( stream_name="tagged-stream", shard_count=1, Tags={ "Environment": "Production", "Application": "Analytics", "CostCenter": "Engineering" } ) # Wait for stream to become active waiter = kinesis.client.get_waiter('stream_exists') waiter.wait(StreamName="my-stream") print("Stream is now active and ready to use")
Note
Stream creation is asynchronous (completes in seconds)
Provisioned mode: Fixed cost based on shard hours
On-demand mode: Pay per GB ingested/retrieved, auto-scales
Default retention: 24 hours (configurable up to 365 days)
Maximum 50 streams per region by default (soft limit)
Stream name must be unique within account and region
- put_record(stream_name: str, data: bytes, partition_key: str, **kwargs: Any) Dict[str, Any][source]#
Write a single data record to a Kinesis data stream. Sends one record at a time for real-time ingestion. Each shard supports up to 1,000 records/second with a maximum write throughput of 1 MiB/second.
- Parameters:
stream_name – Name of the Kinesis data stream.
data – Data payload (bytes). Base64-encoded when serialized. Maximum size: 1 MiB (including partition key).
partition_key – Key determining which shard receives the record. Records with the same partition key go to the same shard.
kwargs –
Additional boto3 parameters:
ExplicitHashKey (str): Hash value to explicitly determine target shard, overriding partition key hash.
SequenceNumberForOrdering (str): Guarantees strictly increasing sequence numbers from same client/partition key.
- Returns:
Dictionary containing shard placement information:
{ "ShardId": "shardId-000000000001", "SequenceNumber": "49590338271490256608559692538361571095921575989136588898", "EncryptionType": "NONE" | "KMS" }
- Raises:
AwsClientException – If record write fails.
Example
kinesis = KinesisClient(region="us-east-1") # Put single record response = kinesis.put_record( stream_name="clickstream", data=b'{"event": "page_view", "page": "/home"}', partition_key="user-123" ) print(f"Written to {response['ShardId']}") # With explicit hash key kinesis.put_record( stream_name="orders", data=b'{"order_id": "12345"}', partition_key="order-12345", ExplicitHashKey="123456789" )
- put_records(stream_name: str, records: List[Dict[str, Any]]) Dict[str, Any][source]#
Write multiple data records to a Kinesis data stream in a single request. Batch operation for writing up to 500 records at once. More efficient than multiple put_record() calls. Each record can be up to 1 MiB, with a total request limit of 5 MiB including partition keys. Each shard supports up to 1,000 records/second with 1 MiB/second throughput.
- Parameters:
stream_name – Name of the Kinesis data stream.
records –
List of record dictionaries (max 500). Each record structure:
{ "Data": b"bytes", # Required: The data blob (base64-encoded) "PartitionKey": "string", # Required: Determines shard "ExplicitHashKey": "string" # Optional: Override partition key hash }
- Returns:
Dictionary containing batch operation results:
{ "FailedRecordCount": 0, # Number of failed records "Records": [ { "SequenceNumber": "string", "ShardId": "string", "ErrorCode": "string", # Present if failed "ErrorMessage": "string" # Present if failed }, ], "EncryptionType": "NONE" | "KMS" }
- Raises:
AwsClientException – If the batch write operation fails.
Example
kinesis = KinesisClient(region="us-east-1") # Prepare batch records records = [ { "Data": b'{"event": "login", "user_id": 1}', "PartitionKey": "user-1" }, { "Data": b'{"event": "purchase", "user_id": 2}', "PartitionKey": "user-2" } ] # Put records response = kinesis.put_records( stream_name="events", records=records ) # Check for failures if response["FailedRecordCount"] > 0: for i, record in enumerate(response["Records"]): if "ErrorCode" in record: print(f"Record {i} failed: {record['ErrorMessage']}")
Note
For automatic retry logic with failed records, use send_records() instead, which handles transient failures and throughput exceeded errors.
- send_records(records: List[Dict[str, Any]], stream_name: str, partition_key: str, records_per_request: int = 500, max_attempts: int = 10, interval_between_attempt: int = 1) None[source]#
Send records to Kinesis with automatic retry logic. High-level wrapper around put_records() that automatically handles transient failures and throughput exceeded errors. Converts Python dictionaries to JSON, batches records, and retries failed records with exponential backoff.
- Parameters:
records – List of record dictionaries to send. Each dict will be JSON-serialized. Example: [{“event”: “login”, “user_id”: 123}, …]
stream_name – Name of the target Kinesis data stream.
partition_key – Partition key for all records. All records will be sent to the same shard based on this key.
records_per_request – Maximum number of records per batch request. Default: 500 (AWS maximum).
max_attempts – Maximum retry attempts for failed records. Default: 10.
interval_between_attempt – Base delay in seconds between retry attempts. Uses exponential backoff (delay = interval * attempt_number). Default: 1 second.
- Raises:
AwsClientException – If records still fail after max_attempts retries.
Example
kinesis = KinesisClient(region="us-east-1") # Send event records with automatic retry events = [ {"event": "login", "user_id": 123, "timestamp": "2024-01-01T10:00:00Z"}, {"event": "purchase", "user_id": 456, "amount": 99.99}, {"event": "logout", "user_id": 123} ] kinesis.send_records( records=events, stream_name="user-events", partition_key="events" ) # With custom retry settings kinesis.send_records( records=events, stream_name="critical-events", partition_key="events", records_per_request=100, # Smaller batches max_attempts=20, # More retries interval_between_attempt=2 # Longer delays )
Note
Records are automatically JSON-serialized using json.dumps()
Failed records are automatically retried with exponential backoff
Delay between retries: 1s, 2s, 3s, 4s, … (linear backoff: interval * attempt)
All records use the same partition key (same shard)
- _send_to_kinesis_stream(records: List[Dict[str, Any]], stream_name: str, records_per_request: int = 500, max_attempts: int = 10, interval_between_attempt: int = 1) None[source]#
Internal implementation for sending records with retry logic.
Handles batching, retry logic with exponential backoff, and error tracking. Called by send_records() to perform the actual data transfer. This method splits large record sets into batches and retries failed records automatically.
- Parameters:
records –
List of formatted record dictionaries ready for Kinesis API.
[{ "Data": b"bytes", # JSON-serialized data (bytes or string) "PartitionKey": "string", # Required "ExplicitHashKey": "string" # Optional }]
stream_name – Name of the Kinesis data stream.
records_per_request – Number of records per batch (max 500). Default: 500.
max_attempts – Maximum retry attempts for failed records. Default: 10.
interval_between_attempt – Base interval in seconds for exponential backoff. Default: 1.
- Raises:
AwsClientException – If any records still fail after max_attempts retries.
Algorithm:
Split records into batches of records_per_request size
For each batch:
Send to Kinesis using put_records()
Check for failed records in response
If failures exist and attempts remain:
Wait: interval * attempt_number seconds
Extract only failed records
Retry the failed records
Repeat until success or max_attempts reached
Raise exception if any records still failed
Note
This is an internal method. Use send_records() instead.
Uses linear backoff: 1s, 2s, 3s, 4s, 5s, …
Only retries records that actually failed (not entire batch)
Lambdas#
The following links contain information related to resources for Lambda Functions…
S3 Client#
AWS S3 (Simple Storage Service) client wrapper.
This module provides a high-level interface for interacting with AWS S3, including bucket operations, object upload/download, and batch operations.
- class core_aws.services.s3.client.S3ACL(*values)[source]#
Bases:
StrEnumEnum for S3 canned ACL (Access Control List) values.
These are predefined access control lists that define common access patterns for S3 buckets and objects. AWS recommends using bucket policies and IAM policies instead of ACLs for most access control needs.
- PRIVATE = 'private'#
Owner gets FULL_CONTROL. No one else has access rights.
- PUBLIC_READ = 'public-read'#
Owner gets FULL_CONTROL. AllUsers group gets READ access.
- PUBLIC_READ_WRITE = 'public-read-write'#
Owner gets FULL_CONTROL. AllUsers group gets READ and WRITE access.
- AUTHENTICATED_READ = 'authenticated-read'#
Owner gets FULL_CONTROL. AuthenticatedUsers group gets READ access.
- static _generate_next_value_(name, start, count, last_values)#
Return the lower-cased version of the member name.
- class core_aws.services.s3.client.S3ObjectOwnership(*values)[source]#
Bases:
StrEnumEnum for S3 object ownership settings.
Controls object ownership and whether ACLs are enabled for the bucket. This setting applies to all objects in the bucket.
- BUCKET_OWNER_ENFORCED = 'BucketOwnerEnforced'#
ACLs are disabled. Bucket owner automatically owns and has full control over all objects in the bucket. All access control must be done via bucket policies and IAM policies.
- BUCKET_OWNER_PREFERRED = 'BucketOwnerPreferred'#
Bucket owner owns objects if they are uploaded with the bucket-owner-full-control canned ACL. Otherwise, object writer retains ownership.
- OBJECT_WRITER = 'ObjectWriter'#
Object uploader retains ownership. The AWS account that uploads an object owns the object, has full control over it, and can grant other users access to it through ACLs.
- static _generate_next_value_(name, start, count, last_values)#
Return the lower-cased version of the member name.
- class core_aws.services.s3.client.S3Client(signature_version: str = 's3v4', **kwargs: Any)[source]#
Bases:
AwsClientClient for AWS S3 (Simple Storage Service).
This client provides methods for managing S3 buckets and objects, including upload, download, copy, delete, and list operations. Supports both single-part and multipart operations with automatic pagination handling.
Example
# Initialize client s3 = S3Client(region_name="us-east-1") # Check if bucket exists if s3.does_bucket_exist("my-bucket"): print("Bucket exists") # Upload a file s3.upload_file( bucket="my-bucket", key="data/file.csv", path="/local/path/file.csv" ) # List all objects for obj in s3.list_all_objects("my-bucket", "data/"): print(f"{obj['Key']}: {obj['Size']} bytes") # Download a file s3.download_file( bucket="my-bucket", key="data/file.csv", local_path="/local/download/file.csv" )
- client: mypy_boto3_s3.client.S3Client#
- __init__(signature_version: str = 's3v4', **kwargs: Any) None[source]#
Initialize the S3 client.
- Parameters:
signature_version – S3 signature version. Default: “s3v4” (recommended). Options: “s3v4” (AWS Signature Version 4), “s3” (legacy).
kwargs –
- Additional arguments passed to boto3.client():
region_name: AWS region (e.g., ‘us-east-1’)
endpoint_url: Custom endpoint (for LocalStack, MinIO, etc.)
aws_access_key_id, aws_secret_access_key, aws_session_token
Example
# Standard S3 s3 = S3Client(region_name="us-east-1") # With custom endpoint (LocalStack) s3_local = S3Client( endpoint_url="http://localhost:4566", region_name="us-east-1" ) # Legacy signature version s3_legacy = S3Client(signature_version="s3")
- create_bucket(bucket: str, acl: S3ACL | str | None = None, object_ownership: S3ObjectOwnership | str | None = None, **kwargs: Any) Dict[str, Any][source]#
Create a new S3 bucket in the specified region. Bucket names must be globally unique across all AWS accounts. For regions outside us-east-1, a LocationConstraint must be specified.
- Parameters:
bucket – Name of the bucket to create. Must be: - 3-63 characters long - Lowercase letters, numbers, hyphens, and periods only - Must start and end with a letter or number - Must not be formatted as an IP address - Globally unique across all AWS accounts
acl –
Optional canned ACL to apply to the bucket. Options:
private (default): Owner gets FULL_CONTROL. No one else has access.
public-read: Owner gets FULL_CONTROL. AllUsers get READ access.
public-read-write: Owner gets FULL_CONTROL. AllUsers get READ and WRITE access.
authenticated-read: Owner gets FULL_CONTROL. AuthenticatedUsers get READ access.
Note: AWS recommends using bucket policies instead of ACLs for access control.
object_ownership –
Optional object ownership setting. Controls object ownership and ACL behavior:
BucketOwnerEnforced: ACLs disabled, bucket owner owns all objects
BucketOwnerPreferred: Bucket owner owns objects if uploaded with bucket-owner-full-control ACL
ObjectWriter: Object uploader retains ownership (default legacy behavior)
kwargs –
Additional boto3 parameters:
- CreateBucketConfiguration (dict): Bucket configuration with:
LocationConstraint (str): AWS region (e.g., ‘us-west-2’). Required for all regions except us-east-1.
GrantFullControl (str): Grants READ, WRITE, READ_ACP, and WRITE_ACP permissions
GrantRead (str): Grants READ permission
GrantReadACP (str): Grants READ_ACP permission
GrantWrite (str): Grants WRITE permission
GrantWriteACP (str): Grants WRITE_ACP permission
ObjectLockEnabledForBucket (bool): Enable object lock (requires versioning)
- Returns:
Dictionary containing bucket creation response:
{ "Location": "string" # URI of the created bucket }
- Raises:
AwsClientException – If bucket creation fails (e.g., name already exists, invalid name, permission denied).
Example
from core_aws.services.s3.client import S3Client, S3ACL, S3ObjectOwnership s3 = S3Client(region_name="us-east-1") # Create bucket in us-east-1 (default region) response = s3.create_bucket(bucket="my-unique-bucket-name") print(f"Created bucket at: {response['Location']}") # Create bucket in a specific region s3_west = S3Client(region_name="us-west-2") response = s3_west.create_bucket( bucket="my-west-bucket", CreateBucketConfiguration={ "LocationConstraint": "us-west-2" } ) # Create bucket with object ownership controls (using enum) response = s3.create_bucket( bucket="my-controlled-bucket", object_ownership=S3ObjectOwnership.BUCKET_OWNER_ENFORCED ) # Create private bucket with explicit ACL (using enum) response = s3.create_bucket( bucket="my-private-bucket", acl=S3ACL.PRIVATE ) # Create public-read bucket (string also works) response = s3.create_bucket( bucket="my-public-bucket", acl="public-read" ) # Create bucket with object lock enabled response = s3.create_bucket( bucket="my-locked-bucket", ObjectLockEnabledForBucket=True )
Note
Bucket names must be globally unique across all AWS accounts
For regions other than us-east-1, must specify CreateBucketConfiguration
Bucket creation is eventually consistent
Maximum 100 buckets per AWS account by default
- does_bucket_exist(bucket: str) bool[source]#
Check if an S3 bucket exists and is accessible. Performs a lightweight list operation to verify bucket existence and access permissions.
- Parameters:
bucket – Name of the S3 bucket to check.
- Returns:
True if bucket exists and is accessible, False if bucket doesn’t exist.
- Raises:
AwsClientException – If access is denied or other errors occur.
Example
s3 = S3Client(region_name="us-east-1") if s3.does_bucket_exist("my-bucket"): print("Bucket exists and is accessible") else: print("Bucket does not exist")
- list_objects(bucket: str, prefix: str = '', **kwargs: Any) Dict[str, Any][source]#
List objects in an S3 bucket (single page, up to 1000 objects). Returns a single page of results. For listing all objects with automatic pagination, use list_all_objects() instead.
- Parameters:
bucket – Name of the S3 bucket.
prefix – Object key prefix filter (e.g., “folder/subfolder/”).
kwargs – Additional boto3 parameters (Delimiter, MaxKeys, Marker, etc.).
- Returns:
Dictionary containing object listing with structure:
{ "IsTruncated": True|False, "Marker": "string", "NextMarker": "string", "Contents": [ { "Key": "string", "LastModified": datetime(2015, 1, 1), "ETag": "string", "ChecksumAlgorithm": [ "CRC32"|"CRC32C"|"SHA1"|"SHA256", ], "Size": 123, "StorageClass": "STANDARD" | ... | "GLACIER" ", "Owner": { "DisplayName": "string", "ID": "string" }, "RestoreStatus": { "IsRestoreInProgress": True|False, "RestoreExpiryDate": datetime(2015, 1, 1) } }, ], "Name": "string", "Prefix": "string", "Delimiter": "string", "MaxKeys": 123, "CommonPrefixes": [ { "Prefix": "string" }, ], "EncodingType": "url", "RequestCharged": "requester" }
- list_all_objects(bucket: str, prefix: str = '', **kwargs: Any) Iterator[Dict[str, Any]][source]#
Retrieve information of all objects (files) into s3 bucket. This way you don”t need to worry about pagination…
- Parameters:
bucket – Bucket name.
prefix – Objects prefix.
kwargs
- Returns:
An iterator that contains dictionaries with the following structure
{ "Key": "string", "LastModified": datetime(2015, 1, 1), "ETag": "string", "ChecksumAlgorithm": [ "CRC32"|"CRC32C"|"SHA1"|"SHA256", ], "Size": 123, "StorageClass": "STANDARD" | ... | "GLACIER" ", "Owner": { "DisplayName": "string", "ID": "string" }, "RestoreStatus": { "IsRestoreInProgress": True|False, "RestoreExpiryDate": datetime(2015, 1, 1) } }
- upload_file(bucket: str, key: str, path: str, **kwargs: Any) None[source]#
Upload a file to AWS bucket… https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/upload_file.html
- Parameters:
bucket – The bucket name of the bucket containing the object.
key – Key/path of the object.
path – (str) – The path to the file to upload.
kwargs
- Returns:
- upload_object(bucket: str, key: str, data: BytesIO, **kwargs) None[source]#
Upload a file-like object (BytesIO) to AWS bucket… https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.upload_fileobj
- Parameters:
bucket – The bucket name of the bucket containing the object.
key – Key/path of the object.
data – A file-like object to upload. At a minimum, it must implement the read method, and must return bytes.
kwargs
- download_file(bucket: str, key: str, local_path: str, **kwargs) str[source]#
Download file from the S3 bucket to the local path… https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/download_file.html
- Parameters:
local_path – Path to save the file locally.
bucket – The bucket name of the bucket containing the object.
key – Key/path of the object.
kwargs
- Returns:
The local file where the file was stored.
- download_object(buffer: BytesIO, bucket: str, key: str, **kwargs) None[source]#
Download an object from S3 to a file-like object. The file-like object must be in binary mode…
- Parameters:
bucket – The bucket name of the bucket containing the object.
key – Key/path of the object.
buffer – (a file-like object) – A file-like object to download into. At a minimum, it must implement the write method and must accept bytes.
kwargs
- copy(copy_source: Dict, bucket: str, key: str, **kwargs) None[source]#
Copy an object from one S3 location to another. This is a managed transfer which will perform a multipart copy in multiple threads if necessary…
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.copy
- Parameters:
copy_source (dict) – The name of the source bucket, key name of the source object, and optional version ID of the source object. The dictionary format is:
{"Bucket": "bucket", "Key": "key", "VersionId": "id"}. Note that theVersionIdkey is optional and may be omitted.bucket (str) – The name of the bucket to copy to
key (str) – The name of the key to copy to
kwargs –
Extra arguments. ExtraArgs: dict – Extra arguments that may be passed to the client operation
- Callback: function
A method which takes a number of bytes transferred to be periodically called during the copy.
- SourceClient: botocore or boto3 Client
The client to be used for operation that may happen at the source object. For example, this client is used for the head_object that determines the size of the copy. If no client is provided, the current client is used as the client for the source object.
- ConfigLoader: boto3.s3.transfer.TransferConfig
The transfer configuration to be used when performing the copy.
- delete_object(bucket: str, key: str, **kwargs) Dict[source]#
Delete objects from a bucket using a single request… https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/delete_object.html
- Parameters:
bucket – The bucket name of the bucket containing the object.
key – Key name of the object to delete.
kwargs
- Returns:
{ "DeleteMarker": True|False, "VersionId": "string", "RequestCharged": "requester" }
- delete_objects(bucket: str, objects: List[Dict], **kwargs) Dict[source]#
Delete objects from a bucket using a single request… https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/delete_objects.html
- Parameters:
bucket
objects –
The objects to delete… Object Identifier is unique value to identify objects. Key (string) [REQUIRED] -> Key name of the object to delete. VersionId (string) -> VersionId for the specific version of the object to delete.
Example…
[{ "Key": "some_file.csv", "VersionId": "6LGg7gQLhY41.maGB5Z6SWW.dcq0vx7b", }]
kwargs
- Returns:
{ "Deleted": [ { "Key": "string", "VersionId": "string", "DeleteMarker": True|False, "DeleteMarkerVersionId": "string" }, ], "RequestCharged": "requester", "Errors": [ { "Key": "string", "VersionId": "string", "Code": "string", "Message": "string" }, ] }
SNS Client#
AWS Simple Notification Service (SNS) client wrapper.
This module provides a high-level interface for interacting with AWS SNS, including message publishing to topics, batch operations, and SMS delivery.
- class core_aws.services.sns.client.SnsMessage(Message: Dict | str, Id: str | None = None, Subject: str | None = None, MessageStructure: str | None = None, MessageAttributes: Dict | None = None, MessageDeduplicationId: str | None = None, MessageGroupId: str | None = None)[source]#
Bases:
objectRepresents an SNS message to be published to a topic, phone number, or endpoint.
This class encapsulates all parameters needed to publish a message via SNS, supporting both simple string messages and structured JSON messages for multiprotocol delivery.
Example
# Simple text message msg = SnsMessage(Message="Hello, SNS!") # Structured message with subject msg = SnsMessage( Message="Important notification", Subject="Alert", Id="msg-001" ) # Dict message (auto-converted to JSON) msg = SnsMessage( Message={"user_id": 123, "event": "signup"}, Id="msg-002" ) # FIFO topic message msg = SnsMessage( Message="Order processed", MessageGroupId="orders", MessageDeduplicationId="order-12345" )
- __init__(Message: Dict | str, Id: str | None = None, Subject: str | None = None, MessageStructure: str | None = None, MessageAttributes: Dict | None = None, MessageDeduplicationId: str | None = None, MessageGroupId: str | None = None) None[source]#
Initialize an SNS message.
- Parameters:
Message –
The message content to send. Can be either a string or dict.
String: Sent as-is to all transport protocols.
Dict: Automatically converted to JSON structure with “default” key.
- Constraints:
Except for SMS: UTF-8 strings, max 256 KB (262,144 bytes).
SMS: Max 140-160 characters depending on encoding. Messages longer than limit are split. Total SMS limit: 1,600 characters.
Id – Unique identifier for the message within a batch. Required for publish_batch() operation, ignored for single publish() calls.
Subject – Optional subject line for email endpoints. Also included in standard JSON messages delivered to other endpoints.
MessageStructure –
Set to “json” to send different messages per protocol.
- When set to “json”, the Message parameter must:
Be valid JSON
Contain at least a “default” key with a string value
Optionally contain protocol-specific keys (“http”, “sms”, “email”, etc.)
- Example JSON structure:
{“default”: “Default message”, “sms”: “Short SMS”, “email”: “Detailed email”}
MessageAttributes – Custom message attributes for filtering and routing. Dict of attribute names to attribute values with DataType and StringValue/BinaryValue.
MessageDeduplicationId –
FIFO topics only. Deduplication token (up to 128 alphanumeric + punctuation). Messages with same ID within 5 minutes are treated as duplicates.
If topic has ContentBasedDeduplication, this overrides auto-generated ID.
MessageGroupId – FIFO topics only (required). Message group tag (up to 128 alphanumeric + punctuation). Messages in same group are processed in FIFO order. Messages in different groups may process out of order.
- as_dict() Dict[str, Any][source]#
Convert the message to a dictionary for SNS API calls. Automatically handles dict messages by converting them to JSON structure with “default” key. Removes None/empty values.
- Returns:
Dictionary ready for SNS publish/publish_batch API.
Example
msg = SnsMessage(Message="Hello", Subject="Test", Id="msg-1") payload = msg.as_dict() # {"Id": "msg-1", "Message": "Hello", "Subject": "Test"} msg2 = SnsMessage(Message={"data": 123}) payload2 = msg2.as_dict() # { # "Message": '{"default": "{\"data\": 123}"}', # "MessageStructure": "json" # }
- class core_aws.services.sns.client.SnsClient(region: str, batch_size: int = 10, **kwargs: Any)[source]#
Bases:
AwsClientClient for AWS Simple Notification Service (SNS). This client provides methods for publishing messages to SNS topics, endpoints, and phone numbers. Supports both single and batch operations.
Example
# Initialize client sns = SnsClient(region="us-east-1") # Publish to topic msg = SnsMessage(Message="Hello, SNS!", Subject="Notification") sns.publish_message(msg, topic_arn="arn:aws:sns:us-east-1:123:my-topic") # Publish multiple messages messages = [ SnsMessage(Message="Message 1", Id="msg-1"), SnsMessage(Message="Message 2", Id="msg-2") ] sns.publish_batch("arn:aws:sns:us-east-1:123:my-topic", messages)
- client: mypy_boto3_sns.client.SNSClient#
- __init__(region: str, batch_size: int = 10, **kwargs: Any) None[source]#
Initialize the SNS client.
- Parameters:
region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).
batch_size – Maximum messages per batch (default: 10, max: 10).
kwargs – Additional arguments passed to boto3.client().
- create_topic(name: str, attributes: Dict[str, str] | None = None, tags: List[Dict[str, str]] | None = None, data_protection_policy: str | None = None) str[source]#
Create a new SNS topic. Creates a standard or FIFO topic based on name suffix (.fifo for FIFO topics). Returns the topic ARN for publishing messages and subscribing endpoints.
- Parameters:
name – Topic name. Must be 1-256 characters, alphanumeric plus hyphens and underscores. For FIFO topics, name must end with “.fifo” suffix.
attributes –
Optional topic configuration attributes. Common attributes:
DeliveryPolicy: JSON string for delivery retry policy
DisplayName: Human-readable name for SMS sender (max 100 chars)
FifoTopic: “true” for FIFO topic (auto-set if name ends with .fifo)
ContentBasedDeduplication: “true” to enable content-based deduplication (FIFO only)
KmsMasterKeyId: AWS KMS key ID for encryption
Policy: JSON string for topic access policy
tags –
Optional list of tags to apply to the topic. Each tag is a dict with ‘Key’ and ‘Value’ fields. Maximum 50 tags per topic.
Example: [{“Key”: “Environment”, “Value”: “Production”}]
data_protection_policy – Optional JSON string defining data protection policy for sensitive data scanning and redaction.
- Returns:
The ARN (Amazon Resource Name) of the created topic.
- Raises:
AwsClientException – If topic creation fails.
Example
sns = SnsClient(region="us-east-1") # Create standard topic topic_arn = sns.create_topic(name="my-notifications") print(f"Created topic: {topic_arn}") # Create FIFO topic with attributes fifo_arn = sns.create_topic( name="my-orders.fifo", attributes={ "FifoTopic": "true", "ContentBasedDeduplication": "true" }, tags=[ {"Key": "Environment", "Value": "Production"}, {"Key": "Application", "Value": "OrderProcessing"} ] ) # Create topic with KMS encryption encrypted_arn = sns.create_topic( name="secure-topic", attributes={ "KmsMasterKeyId": "alias/aws/sns", "DisplayName": "Secure Notifications" } )
Note
Topic names are case-sensitive
Creating a topic with existing name is idempotent (returns existing ARN)
FIFO topics support message ordering and deduplication
Standard topics offer best-effort ordering and at-least-once delivery
- subscribe_sqs_queue(topic_arn: str, queue_name: str, region: str | None = None, set_queue_policy: bool = True, attributes: Dict[str, str] | None = None) str[source]#
Subscribe an SQS queue to an SNS topic by queue name. This is a convenience method that handles the entire subscription process, including setting up the necessary queue policy to allow SNS to send messages.
- Reference:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/subscribe.html
- Parameters:
topic_arn – ARN of the SNS topic to subscribe the queue to.
queue_name – Name of the SQS queue to subscribe. The queue must exist in the same region as the SNS client or in the specified region.
region – Optional AWS region where the SQS queue exists. If not specified, uses the same region as the SNS client.
set_queue_policy – If True (default), automatically sets the SQS queue policy to allow the SNS topic to send messages. Set to False if you want to manage the queue policy manually.
attributes –
Optional subscription attributes. Common attributes:
RawMessageDelivery: “true” to deliver raw messages without SNS envelope (default: “false”)
FilterPolicy: JSON string for message filtering based on attributes
RedrivePolicy: JSON string defining dead-letter queue for failed deliveries
- Returns:
The subscription ARN for the created subscription.
- Raises:
AwsClientException – If subscription fails or queue doesn’t exist.
Example
from core_aws.services.sns.client import SnsClient from core_aws.services.sqs.client import SqsClient sns = SnsClient(region="us-east-1") sqs = SqsClient(region="us-east-1") # Create topic and queue topic_arn = sns.create_topic(name="notifications") queue_url = sqs.create_queue(queue_name="notification-queue") # Subscribe queue to topic (automatically sets queue policy) subscription_arn = sns.subscribe_sqs_queue( topic_arn=topic_arn, queue_name="notification-queue" ) print(f"Subscribed: {subscription_arn}") # Subscribe with raw message delivery subscription_arn = sns.subscribe_sqs_queue( topic_arn=topic_arn, queue_name="notification-queue", attributes={ "RawMessageDelivery": "true" } ) # Subscribe with message filtering subscription_arn = sns.subscribe_sqs_queue( topic_arn=topic_arn, queue_name="notification-queue", attributes={ "FilterPolicy": json.dumps({ "event_type": ["order_placed", "order_shipped"] }) } )
Note
The SQS queue must exist before calling this method
By default, sets queue policy to allow SNS to send messages
Subscription is confirmed automatically for SQS endpoints
For cross-region subscriptions, specify the queue’s region
- publish_message(message: SnsMessage, topic_arn: str | None = None, target_arn: str | None = None, phone_number: str | None = None) Dict[str, Any][source]#
Publish a message to an SNS topic, mobile endpoint, or phone number. Sends a message to one of three destinations: an SNS topic (fan-out), a mobile platform endpoint (push notification), or a phone number (SMS). Exactly one destination must be specified.
- Reference:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/publish.html
- Parameters:
message – SnsMessage object containing the message and metadata.
topic_arn – ARN of the SNS topic to publish to. Required if target_arn and phone_number are not specified.
target_arn – ARN of mobile platform endpoint or device. Required if topic_arn and phone_number are not specified.
phone_number – Phone number in E.164 format (e.g., +14155551234). Required if topic_arn and target_arn are not specified.
- Returns:
- Dictionary containing:
MessageId (str): Unique identifier for the published message.
SequenceNumber (str, optional): For FIFO topics only.
- Raises:
AwsClientException – If no destination specified or if publishing fails.
Example
sns = SnsClient(region="us-east-1") # Publish to topic msg = SnsMessage(Message="Hello!", Subject="Notification") response = sns.publish_message( msg, topic_arn="arn:aws:sns:us-east-1:123456789012:my-topic" ) print(f"Published: {response['MessageId']}") # Send SMS sms_msg = SnsMessage(Message="Your code is: 123456") sns.publish_message(sms_msg, phone_number="+14155551234") # Publish to mobile endpoint push_msg = SnsMessage(Message="New message!") sns.publish_message( push_msg, target_arn="arn:aws:sns:us-east-1:123:endpoint/APNS/MyApp/abc123" )
- publish_batch(topic_arn: str, messages: List[SnsMessage]) Dict[str, List[Dict[str, Any]]][source]#
Publish multiple messages to an SNS topic in batches. Publishes up to 10 messages per API call. For FIFO topics, messages within a batch are published in order and deduplicated within/across batches for 5 minutes. Automatically handles larger message lists by batching into chunks of batch_size.
- Parameters:
topic_arn – ARN of the SNS topic to publish to.
messages – List of SnsMessage objects to publish. Each message must have a unique Id field within the batch.
- Returns:
Dictionary containing aggregated results from all batches:
{ "Successful": [ { "Id": "string", "MessageId": "string", "SequenceNumber": "string" # FIFO only } ], "Failed": [ { "Id": "string", "Code": "string", "Message": "string", "SenderFault": True | False } ] }
- Raises:
AwsClientException – If batch publishing fails.
Example
sns = SnsClient(region="us-east-1") # Create messages with unique IDs messages = [ SnsMessage(Message="First message", Id="msg-1"), SnsMessage(Message="Second message", Id="msg-2"), SnsMessage(Message="Third message", Id="msg-3") ] # Publish batch result = sns.publish_batch( topic_arn="arn:aws:sns:us-east-1:123456789012:my-topic", messages=messages ) print(f"Successful: {len(result['Successful'])}") print(f"Failed: {len(result['Failed'])}") # Check failures for failure in result.get('Failed', []): print(f"Failed {failure['Id']}: {failure['Message']}")
Note
Each message must have a unique Id field
Max 10 messages per batch (enforced by batch_size)
For FIFO topics, MessageGroupId is required
Total payload size per batch must be < 256 KB
SQS Client#
AWS Simple Queue Service (SQS) client wrapper.
This module provides a high-level interface for interacting with AWS SQS, including message sending, receiving, deletion, and batch operations.
- class core_aws.services.sqs.client.SqsClient(region: str, **kwargs: Any)[source]#
Bases:
AwsClientClient for AWS Simple Queue Service (SQS).
This client provides methods for sending, receiving, and deleting messages from SQS queues. It supports both single and batch operations, with automatic retry logic for failed deletions.
Example
# Initialize client sqs = SqsClient(region="us-east-1") # Get a queue by name queue = sqs.get_queue_by_name("my-queue") # Send a message sqs.send_message(queue.url, "Hello, SQS!") # Receive messages messages = sqs.receive_messages(queue.url) for msg in messages: print(msg["Body"])
- client: mypy_boto3_sqs.client.SQSClient#
- __init__(region: str, **kwargs: Any) None[source]#
Initialize the SQS client.
- Parameters:
region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).
kwargs – Additional arguments passed to boto3.client().
- create_queue(queue_name: str, **kwargs: Any) str[source]#
Create a new SQS queue with the specified name and attributes. Creates a standard or FIFO queue. Queue names are limited to 80 characters including alphanumeric characters, hyphens (-), and underscores (_). FIFO queue names must end with the .fifo suffix.
- Parameters:
queue_name – Name of the queue to create (max 80 characters).
kwargs –
Additional boto3 parameters:
Attributes (dict): Queue configuration attributes:
DelaySeconds (str): Delivery delay in seconds (0-900). Default: 0.
MaximumMessageSize (str): Max message size in bytes (1024-262144). Default: 262144.
MessageRetentionPeriod (str): Message retention in seconds (60-1209600). Default: 345600 (4 days).
ReceiveMessageWaitTimeSeconds (str): Long-polling wait time (0-20). Default: 0.
VisibilityTimeout (str): Visibility timeout in seconds (0-43200). Default: 30.
FifoQueue (str): ‘true’ for FIFO queue, ‘false’ for standard. Default: ‘false’.
ContentBasedDeduplication (str): ‘true’ to enable content-based deduplication (FIFO only).
KmsMasterKeyId (str): ID of AWS KMS key for server-side encryption.
KmsDataKeyReusePeriodSeconds (str): KMS key reuse period (60-86400). Default: 300.
DeduplicationScope (str): ‘messageGroup’ or ‘queue’ (high-throughput FIFO only).
FifoThroughputLimit (str): ‘perQueue’ or ‘perMessageGroupId’ (high-throughput FIFO only).
RedrivePolicy (str): JSON string defining dead-letter queue:
{ "deadLetterTargetArn": "arn:aws:sqs:region:account:dlq-name", "maxReceiveCount": "3" }
RedriveAllowPolicy (str): JSON string defining which source queues can use this as DLQ.
tags (dict): Key-value pairs to assign as queue tags.
- Returns:
The queue URL string (e.g., “https://sqs.region.amazonaws.com/account/queue-name”).
- Raises:
AwsClientException – If queue creation fails.
Example
sqs = SqsClient(region="us-east-1") # Create a basic standard queue queue_url = sqs.create_queue(queue_name="my-queue") print(f"Queue URL: {queue_url}") # Create a FIFO queue with custom attributes queue_url = sqs.create_queue( queue_name="my-queue.fifo", Attributes={ "FifoQueue": "true", "ContentBasedDeduplication": "true", "MessageRetentionPeriod": "86400", # 1 day "VisibilityTimeout": "60" } ) # Create a queue with dead-letter queue queue_url = sqs.create_queue( queue_name="my-main-queue", Attributes={ "RedrivePolicy": json.dumps({ "deadLetterTargetArn": "arn:aws:sqs:us-east-1:123456789012:my-dlq", "maxReceiveCount": "5" }), "VisibilityTimeout": "30" }, tags={ "Environment": "production", "Application": "my-app" } )
- get_queue_by_name(queue_name: str, **kwargs: Any) mypy_boto3_sqs.service_resource.Queue[source]#
Retrieve an existing Amazon SQS queue by name. Returns a Queue resource object that can be used to interact with the queue. To access a queue owned by another AWS account, use the QueueOwnerAWSAccountId parameter.
- Parameters:
queue_name – The name of the queue.
kwargs –
Additional boto3 parameters:
- QueueOwnerAWSAccountId (str):
AWS account ID of the account that created the queue. Required for accessing queues in other accounts.
- Returns:
sqs.Queue resource object with the following attributes:
url (str): Queue URL (https://sqs.<region>.amazonaws.com/<account>/QueueName)
dead_letter_source_queues: Dead letter queue information
meta: Queue metadata
attributes (dict): Queue attributes dictionary:
{ 'QueueArn': 'arn:aws:sqs:...', 'ApproximateNumberOfMessages': '0', 'ApproximateNumberOfMessagesNotVisible': '0', 'ApproximateNumberOfMessagesDelayed': '0', 'CreatedTimestamp': '1699539978', 'LastModifiedTimestamp': '1699540164', 'VisibilityTimeout': '300', 'MaximumMessageSize': '262144', 'MessageRetentionPeriod': '3600', 'DelaySeconds': '60', ... }
- Raises:
AwsClientException – If the queue cannot be retrieved.
Example
sqs = SqsClient(region="us-east-1") # Get a queue in same account queue = sqs.get_queue_by_name("my-queue") print(f"Queue URL: {queue.url}") # Get a queue in another account queue = sqs.get_queue_by_name( "cross-account-queue", QueueOwnerAWSAccountId="123456789012" )
- send_message(queue_url: str, message: str, **kwargs: Any) Dict[str, Any][source]#
Send a single message to an SQS queue. Delivers a message to the specified queue. Messages can contain XML, JSON, or unformatted text. Allowed Unicode characters: #x9, #xA, #xD, #x20 to #xD7FF, #xE000 to #xFFFD, #x10000 to #x10FFFF.
- Parameters:
queue_url – URL of the SQS queue.
message – Message body (up to 256 KB).
kwargs –
Additional boto3 parameters:
DelaySeconds (int): Delay before message becomes available (0-900 seconds).
MessageAttributes (dict): Custom attributes for the message.
MessageDeduplicationId (str): Deduplication ID for FIFO queues.
MessageGroupId (str): Message group ID for FIFO queues (required).
- Returns:
Dictionary containing MessageId, MD5OfMessageBody, and SequenceNumber.
- Raises:
AwsClientException – If message sending fails.
Example
sqs = SqsClient(region="us-east-1") # Send a simple message response = sqs.send_message( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", message="Hello, SQS!" ) print(f"Message ID: {response['MessageId']}") # Send with delay and attributes sqs.send_message( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", message='{"event": "user_signup", "user_id": 123}', DelaySeconds=30, MessageAttributes={ "EventType": { "StringValue": "UserSignup", "DataType": "String" } } )
- send_message_batch(queue_url: str, entries: List[Dict[str, Any]], **kwargs: Any) Dict[str, Any][source]#
Send multiple messages to an SQS queue in a single request. Delivers up to 10 messages to the specified queue in a single batch operation. For FIFO queues, messages within a batch are enqueued in the order they are sent.
- Parameters:
queue_url – URL of the SQS queue.
entries –
List of message entries (max 10). Each entry must contain:
Id (str): Unique identifier for the message (within batch).
MessageBody (str): Message content (up to 256 KB).
DelaySeconds (int, optional): Message delay (0-900 seconds).
MessageAttributes (dict, optional): Custom message attributes.
MessageDeduplicationId (str, optional): For FIFO queues.
MessageGroupId (str, optional): For FIFO queues.
kwargs – Additional boto3 parameters.
- Returns:
Dictionary containing Successful and Failed lists:
{ "Successful": [ { "Id": "string", "MessageId": "string", "MD5OfMessageBody": "string", "SequenceNumber": "string" } ], "Failed": [ { "Id": "string", "SenderFault": True|False, "Code": "string", "Message": "string" } ] }
- Raises:
AwsClientException – If batch sending fails.
Example
sqs = SqsClient(region="us-east-1") # Send multiple messages response = sqs.send_message_batch( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", entries=[ { "Id": "msg1", "MessageBody": "First message" }, { "Id": "msg2", "MessageBody": "Second message", "DelaySeconds": 10 }, { "Id": "msg3", "MessageBody": "Third message" } ] ) print(f"Successful: {len(response['Successful'])}") print(f"Failed: {len(response['Failed'])}")
- receive_messages(queue_url: str, max_number_of_msg: int = 10, **kwargs: Any) List[Dict[str, Any]][source]#
Receive one or more messages from an SQS queue. Retrieves up to 10 messages from the specified queue. Use the WaitTimeSeconds parameter to enable long-polling (recommended for reducing empty receives and API costs).
- Parameters:
queue_url – URL of the SQS queue.
max_number_of_msg – Maximum number of messages to retrieve (1-10). Default: 10.
kwargs –
Additional boto3 parameters:
AttributeNames (list): System attributes to retrieve (e.g., [‘All’, ‘ApproximateReceiveCount’]).
MessageAttributeNames (list): Custom message attributes to retrieve (e.g., [‘All’]).
VisibilityTimeout (int): Duration message is hidden after retrieval (0-43200 seconds).
WaitTimeSeconds (int): Long-polling wait time (0-20 seconds).
- Returns:
List of message dictionaries:
[ { "MessageId": "string", "ReceiptHandle": "string", "MD5OfBody": "string", "Body": "string", "Attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1699539978000" }, "MD5OfMessageAttributes": "string", "MessageAttributes": { "AttributeName": { "StringValue": "string", "BinaryValue": b"bytes", "DataType": "String" } } } ]
- Raises:
AwsClientException – If message retrieval fails.
Example
sqs = SqsClient(region="us-east-1") # Basic receive messages = sqs.receive_messages( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue" ) for msg in messages: print(f"Message: {msg['Body']}") # Long-polling with attributes messages = sqs.receive_messages( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", max_number_of_msg=5, WaitTimeSeconds=20, # Long-poll for 20 seconds MessageAttributeNames=['All'], AttributeNames=['All'] )
- retrieve_all_messages(queue_url: str, **kwargs: Any) Iterator[Dict[str, Any]][source]#
Retrieve all messages from a queue using an iterator. Continuously polls the queue and yields messages until the queue is empty. Useful for processing all messages in a queue.
- Parameters:
queue_url – URL of the SQS queue.
kwargs – Additional parameters passed to receive_messages().
- Returns:
Iterator yielding message dictionaries.
Example
sqs = SqsClient(region="us-east-1") # Process all messages in queue for message in sqs.retrieve_all_messages( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", WaitTimeSeconds=5 ): print(f"Processing: {message['Body']}") # Process the message... sqs.delete_message(queue_url, message['ReceiptHandle'])
Warning
This method will continue until the queue is empty. Make sure to delete messages after processing to avoid infinite loops.
- delete_message(queue_url: str, receipt_handle: str, **kwargs: Any) Dict[str, Any][source]#
Delete a single message from an SQS queue. Deletes the specified message using its ReceiptHandle (not MessageId). Messages are automatically deleted after the retention period expires. Amazon SQS can delete a message even if it’s locked by visibility timeout.
- Parameters:
queue_url – URL of the SQS queue.
receipt_handle – Receipt handle of the message (from receive_message).
kwargs – Additional boto3 parameters.
- Returns:
Empty dictionary on success.
- Raises:
AwsClientException – If message deletion fails.
Example
sqs = SqsClient(region="us-east-1") # Receive and delete a message messages = sqs.receive_messages( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue" ) for msg in messages: # Process the message... print(msg['Body']) # Delete after processing sqs.delete_message( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", receipt_handle=msg['ReceiptHandle'] )
- delete_message_batch(queue_url: str, entries: List[Dict[str, str]], **kwargs: Any) Dict[str, Any][source]#
Delete multiple messages from an SQS queue in a single request. Deletes up to 10 messages in a single batch operation. Each entry must include the message Id and ReceiptHandle.
- Parameters:
queue_url – URL of the SQS queue.
entries –
List of message entries to delete (max 10). Each entry must contain:
Id (str): Unique identifier for this deletion (within batch).
ReceiptHandle (str): Receipt handle from receive_message.
kwargs – Additional boto3 parameters.
- Returns:
Dictionary containing Successful and Failed lists:
{ "Successful": [ { "Id": "string" } ], "Failed": [ { "Id": "string", "SenderFault": True|False, "Code": "string", "Message": "string" } ] }
- Raises:
AwsClientException – If batch deletion fails.
Example
sqs = SqsClient(region="us-east-1") # Receive messages messages = sqs.receive_messages( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue" ) # Delete in batch delete_entries = [ { "Id": msg['MessageId'], "ReceiptHandle": msg['ReceiptHandle'] } for msg in messages ] result = sqs.delete_message_batch( queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue", entries=delete_entries ) print(f"Deleted: {len(result['Successful'])}") print(f"Failed: {len(result['Failed'])}")
- delete_messages(queue_url: str, entries: List[Dict[str, str]], retries: int = 3, **kwargs: Any) Dict[str, Any][source]#
It’s a wrapper over “delete_message_batch” and will delete all messages, if a message deletion fails it will be re-tried until success or until the maximum attempts are exhausted…
- Parameters:
queue_url – The SQS queue url.
entries – The messages reference to delete.
retries – Number of re-tries in case of errors while deleting the messages.
kwargs – Other arguments to pass to delete_message_batch method.
- Returns:
{ "Successful": [{ "Id": "string" }], "Failed": [{ "Id": "string", "SenderFault": True|False, "Code": "string", "Message": "string" }] }
SSM Client#
AWS Systems Manager (SSM) Parameter Store client wrapper.
This module provides a high-level interface for interacting with AWS Systems Manager Parameter Store, including support for retrieving parameters, secrets, and managing parameter hierarchies.
- class core_aws.services.ssm.client.SsmClient(region: str, **kwargs: Any)[source]#
Bases:
AwsClientClient for AWS Systems Manager (SSM) Parameter Store.
This client provides methods for retrieving, storing, and managing parameters in AWS Systems Manager Parameter Store. It supports:
Individual parameter retrieval with decryption
Bulk parameter retrieval by path hierarchy
Integration with AWS Secrets Manager
Parameter creation and updates
Object attribute population from SSM parameters
Example
# Initialize client ssm = SsmClient(region="us-east-1") # Get a single parameter param = ssm.get_parameter("/myapp/database/host") print(param["Value"]) # Get all parameters under a path for param in ssm.get_parameters_by_path("/myapp/"): print(f"{param['Name']}: {param['Value']}") # Retrieve a secret from Secrets Manager secret = ssm.get_secret("my-database-password")
- client: mypy_boto3_ssm.client.SSMClient#
- __init__(region: str, **kwargs: Any) None[source]#
Initialize the SSM client.
- Parameters:
region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).
kwargs – Additional arguments passed to boto3.client().
- get_secret(secret_id: str) str[source]#
Retrieve a secret value from AWS Secrets Manager via SSM Parameter Store. This method uses SSM’s special reference format to access secrets stored in AWS Secrets Manager: /aws/reference/secretsmanager/{secret_id}.
- Parameters:
secret_id – The ID or name of the secret in Secrets Manager.
- Returns:
The decrypted secret value as a string.
- Raises:
AwsClientException – If the secret cannot be retrieved.
Example
ssm = SsmClient(region="us-east-1") db_password = ssm.get_secret("prod/database/password") print(f"Password: {db_password}")
- get_parameter(parameter_name: str, with_decryption: bool = True) SSMParameter[source]#
Retrieve a parameter from SSM Parameter Store. Retrieves information about a single parameter including its value, type, version, and metadata. Supports automatic decryption of SecureString parameters.
- Parameters:
parameter_name – The fully qualified name of the parameter you want to query. Must include the complete hierarchy path (e.g., ‘/myapp/database/host’).
with_decryption – Return decrypted values for SecureString parameters. This flag is ignored for String and StringList parameter types. Default: True.
- Returns:
SSMParameter dictionary containing parameter information.
- Raises:
AwsClientException – If the parameter cannot be retrieved.
Example
ssm = SsmClient(region="us-east-1") # Get a standard parameter param = ssm.get_parameter("/myapp/config/api_url") print(param["Value"]) # "https://api.example.com" # Get a SecureString parameter (auto-decrypted) secret_param = ssm.get_parameter("/myapp/secrets/api_key") print(secret_param["Type"]) # "SecureString" # Get without decryption encrypted = ssm.get_parameter( "/myapp/secrets/api_key", with_decryption=False )
- Return Structure:
{ "Name": "string", "Type": "String"|"StringList"|"SecureString", "Value": "string", "Version": 123, "Selector": "string", "SourceResult": "string", "LastModifiedDate": datetime(2015, 1, 1), "ARN": "string", "DataType": "string" }
- get_parameters_by_path(path: str, with_decryption: bool = True, **kwargs: Any) Iterator[SSMParameter][source]#
Retrieve all parameters under a specific path hierarchy. Recursively retrieves parameters from a path in SSM Parameter Store, automatically handling pagination. This is useful for fetching multiple related parameters at once (e.g., all database configuration parameters under /myapp/database/).
- Parameters:
path –
The parameter path hierarchy. Hierarchies start with a forward slash (/) and can have up to 15 levels. Examples:
/myapp/- gets all parameters under myapp/myapp/database/- gets all database parameters/prod/api/config/- gets all production API configs
with_decryption – Retrieve all parameters with their values decrypted (for SecureString types). Default: True.
kwargs –
Additional boto3 parameters:
- Recursive (bool):
Retrieve all parameters within the hierarchy, not just immediate children. Default: False.
- ParameterFilters (list):
- Filters to limit results. List of dicts with keys:
Key (string): Filter key (e.g., ‘Type’, ‘Name’)
Option (string): Filter option (e.g., ‘Equals’, ‘BeginsWith’)
Values (list): Values to match
- MaxResults (int):
Maximum number of items per API call (1-10). Pagination continues automatically regardless of this value.
- Returns:
Iterator yielding SSMParameter dictionaries. Automatically handles pagination across multiple API calls.
- Raises:
AwsClientException – If parameters cannot be retrieved.
Example
ssm = SsmClient(region="us-east-1") # Get all parameters under a path for param in ssm.get_parameters_by_path("/myapp/database/"): print(f"{param['Name']}: {param['Value']}") # Get all parameters recursively for param in ssm.get_parameters_by_path( "/myapp/", Recursive=True ): print(f"{param['Name']}: {param['Value']}") # Filter by type for param in ssm.get_parameters_by_path( "/myapp/", ParameterFilters=[ { "Key": "Type", "Option": "Equals", "Values": ["SecureString"] } ] ): print(f"Secret: {param['Name']}")
- Return Structure:
Each yielded item has the structure:
{ "Name": "string", "Type": "String"|"StringList"|"SecureString", "Value": "string", "Version": 123, "Selector": "string", "SourceResult": "string", "LastModifiedDate": datetime(2015, 1, 1), "ARN": "string", "DataType": "string" }
- put_parameter(name: str, value: str, overwrite: bool = False, **kwargs: Any) Dict[str, Any][source]#
Create or update a parameter in SSM Parameter Store. Adds a new parameter or updates an existing one in Parameter Store. Supports standard, advanced, and intelligent-tiering parameters with optional encryption for SecureString types.
- Parameters:
name – The fully qualified name of the parameter. Must include the complete hierarchy path with leading forward slash (/). Example: /Dev/DBServer/MySQL/db-string13
value – The parameter value. Standard parameters support up to 4 KB, advanced parameters support up to 8 KB.
overwrite – Overwrite an existing parameter. If False and parameter exists, raises an error. Default: False.
kwargs –
Additional boto3 parameters:
- Description (str):
Information about the parameter. Optional but recommended.
- Type (str):
Parameter type: ‘String’, ‘StringList’, or ‘SecureString’. Default: ‘String’.
- KeyId (str):
KMS key ID for encrypting SecureString parameters. Uses default AWS account key if not specified. Required for SecureString type.
- AllowedPattern (str):
Regex pattern to validate parameter value. Example: ^d+$ for numbers only.
- Tags (list):
Resource tags. List of dicts with ‘Key’ and ‘Value’.
- Tier (str):
’Standard’, ‘Advanced’, or ‘Intelligent-Tiering’. Default: ‘Standard’.
- DataType (str):
Data type hint (e.g., ‘text’, ‘aws:ec2:image’).
- Returns:
Dictionary containing version and tier information.
- Raises:
AwsClientException – If parameter creation/update fails.
Example
ssm = SsmClient(region="us-east-1") # Create a simple parameter result = ssm.put_parameter( name="/myapp/config/api_url", value="https://api.example.com" ) print(f"Version: {result['Version']}") # Create a SecureString parameter ssm.put_parameter( name="/myapp/secrets/api_key", value="secret-key-12345", Type="SecureString", Description="API key for external service" ) # Update an existing parameter ssm.put_parameter( name="/myapp/config/api_url", value="https://new-api.example.com", overwrite=True ) # Create with tags ssm.put_parameter( name="/myapp/config/version", value="1.0.0", Tags=[ {"Key": "Environment", "Value": "Production"}, {"Key": "Application", "Value": "MyApp"} ] )
- Return Structure:
{ "Version": 123, "Tier": "Standard" | "Advanced" | "Intelligent-Tiering" }
- retrieve_parameters_from_ssm(ssm_path: str, parameters: List[str]) Dict[str, str][source]#
Retrieve specific parameters from SSM by matching suffixes. Retrieves all parameters under a path and returns a dictionary mapping parameter suffixes to their values. This is useful when you know the parameter suffixes but not the full paths.
- Parameters:
ssm_path – SSM path to search for parameters (e.g., ‘/myapp/database/’).
parameters – List of parameter name suffixes to extract. Each suffix will be matched against the end of parameter names. Example: [‘host’, ‘port’, ‘username’]
- Returns:
Dictionary mapping parameter suffixes to their values. Returns empty string for suffixes not found.
Example
ssm = SsmClient(region="us-east-1") # Retrieve specific database parameters db_params = ssm.retrieve_parameters_from_ssm( ssm_path="/myapp/database/", parameters=["host", "port", "username"] ) # Result: { # "host": "db.example.com", # "port": "5432", # "username": "admin" # } print(f"Database: {db_params['host']}:{db_params['port']}")
Warning
This method has O(n*m) complexity where n=number of parameters in SSM path and m=number of suffixes. For large parameter sets, consider using get_parameters_by_path() directly and filtering results manually for better performance.
- update_obj_attrs(obj: object, ssm_path: str) None[source]#
Update object attributes with values from SSM Parameter Store. Scans object attributes and replaces their values with matching SSM parameter values. The current attribute value is treated as the SSM parameter name to look up.
- Parameters:
obj – Object whose attributes will be updated. Must have public (non-underscore) attributes.
ssm_path – SSM path to retrieve parameters from (e.g., ‘/myapp/config/’).
Example
class DatabaseConfig: def __init__(self): self.host = "/myapp/database/host" self.port = "/myapp/database/port" self.username = "/myapp/database/username" ssm = SsmClient(region="us-east-1") config = DatabaseConfig() # Before: config.host = "/myapp/database/host" ssm.update_obj_attrs(config, "/myapp/database/") # After: config.host = "db.example.com" print(config.host) # "db.example.com" print(config.port) # "5432"
Warning
This method has O(n*m) complexity where n=number of SSM parameters and m=number of object attributes. Use with caution on large objects or parameter sets.
Note
Only public attributes (not starting with ‘_’) are updated
Methods and callable attributes are skipped
Attribute value must exactly match the SSM parameter name