Services#

Service Clients#

Base Client#

Base classes for AWS service clients.

This module provides the foundational classes for all AWS service client wrappers in the core-aws library.

class core_aws.services.base.AwsClient(service: str, **kwargs: Any)[source]#

Bases: object

Base class for all AWS service client wrappers.

This class provides a common interface for creating boto3 clients with consistent error handling and initialization patterns. All service-specific client classes should inherit from this base class.

Parameters:

client – The underlying boto3 client instance for the AWS service.

Example

class MyServiceClient(AwsClient):
    def __init__(self, region: str = "us-east-1", **kwargs):
        super().__init__(
            service="my_service",
            region_name=region,
            **kwargs
        )

    def my_operation(self):
        return self.client.some_operation()
__init__(service: str, **kwargs: Any) None[source]#

Initialize an AWS service client. Creates a boto3 client for the specified AWS service with the provided configuration options.

Parameters:
  • service – AWS service name (e.g., ‘s3’, ‘sqs’, ‘ssm’, ‘lambda’).

  • kwargs

    Additional arguments passed to boto3.client():
    • region_name: AWS region (e.g., ‘us-east-1’).

    • aws_access_key_id: AWS access key ID.

    • aws_secret_access_key: AWS secret access key.

    • aws_session_token: AWS session token.

    • endpoint_url: Custom endpoint URL (for LocalStack, etc.).

    • config: botocore.client.Config instance.

    • Any other boto3.client() parameters.

Raises:

AwsClientException – If client creation fails.

Example

# Standard usage
client = AwsClient("s3", region_name="us-east-1")

# With custom endpoint (LocalStack)
client = AwsClient(
    "sqs",
    endpoint_url="http://localhost:4566",
    region_name="us-east-1"
)
client: BaseClient#
exception core_aws.services.base.AwsClientException[source]#

Bases: Exception

Custom exception for AWS client operations. This exception is raised when AWS client operations fail. It can wrap the original exception or provide a custom error message.

Usage:
# Simple pass-through (preserves stack trace with 'from')
try:
    client.some_operation()
except Exception as error:
    raise AwsClientException(error) from error

# With custom message
try:
    client.some_operation()
except Exception as error:
    raise AwsClientException(f"Failed to do X: {error}") from error

# Custom message only (no original exception)
raise AwsClientException("Operation failed: custom reason")

CloudFormation Client#

AWS CloudFormation client wrapper.

This module provides a high-level interface for interacting with AWS CloudFormation, including stack description and output value retrieval operations.

class core_aws.services.cloud_formation.client.CloudFormationClient(region: str, **kwargs: Any)[source]#

Bases: AwsClient

Client for AWS CloudFormation.

This client provides methods for interacting with CloudFormation stacks, including retrieving stack information and extracting output values. Simplifies common stack inspection operations with automatic error handling.

Example

# Initialize client
cfn = CloudFormationClient(region="us-east-1")

# Get stack details
stack = cfn.describe_stack(stack_name="my-app-stack")
print(f"Stack Status: {stack['StackStatus']}")
print(f"Stack Outputs: {stack['Outputs']}")

# Get specific output value by export name
vpc_id = cfn.get_output_value(
    stack_name="my-app-stack",
    export_name="MyAppVpcId"
)
if vpc_id:
    print(f"VPC ID: {vpc_id}")
client: mypy_boto3_cloudformation.client.CloudFormationClient#
__init__(region: str, **kwargs: Any) None[source]#

Initialize the CloudFormation client.

Parameters:
  • region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).

  • kwargs – Additional arguments passed to boto3.client().

describe_stack(stack_name: str) Dict[str, Any][source]#

Retrieve detailed information about a CloudFormation stack. Returns comprehensive stack details including status, parameters, outputs, tags, and capabilities. This is a convenience wrapper around describe_stacks that returns only the first stack.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudformation/client/describe_stacks.html

Parameters:

stack_name – Name or ARN of the CloudFormation stack. Example: “my-app-stack” or full ARN.

Returns:

Dictionary containing stack details:

{
    "StackId": "arn:aws:cloudformation:...",
    "StackName": "my-app-stack",
    "Description": "My application infrastructure",
    "Parameters": [
        {
            "ParameterKey": "InstanceType",
            "ParameterValue": "t3.micro"
        }
    ],
    "CreationTime": datetime(2024, 1, 1, 0, 0, 0),
    "LastUpdatedTime": datetime(2024, 6, 1, 0, 0, 0),
    "StackStatus": "CREATE_COMPLETE",  # or UPDATE_COMPLETE, etc.
    "StackStatusReason": "string",
    "Outputs": [
        {
            "OutputKey": "VpcId",
            "OutputValue": "vpc-12345678",
            "Description": "VPC ID",
            "ExportName": "MyAppVpcId"
        }
    ],
    "Tags": [
        {
            "Key": "Environment",
            "Value": "production"
        }
    ],
    "Capabilities": ["CAPABILITY_IAM"],
    "DriftInformation": {
        "StackDriftStatus": "IN_SYNC" | "DRIFTED" | "NOT_CHECKED"
    }
}

Raises:

AwsClientException – If stack doesn’t exist or operation fails.

Example

cfn = CloudFormationClient(region="us-east-1")

# Get stack details
stack = cfn.describe_stack(stack_name="my-app-stack")

print(f"Stack: {stack['StackName']}")
print(f"Status: {stack['StackStatus']}")
print(f"Created: {stack['CreationTime']}")

# Check stack parameters
for param in stack.get("Parameters", []):
    print(f"  {param['ParameterKey']}: {param['ParameterValue']}")

# Check stack outputs
for output in stack.get("Outputs", []):
    print(f"  {output['OutputKey']}: {output['OutputValue']}")

# Check stack status before operations
if stack["StackStatus"] in ["CREATE_COMPLETE", "UPDATE_COMPLETE"]:
    print("Stack is stable")
elif "IN_PROGRESS" in stack["StackStatus"]:
    print("Stack operation in progress")
elif "FAILED" in stack["StackStatus"]:
    print(f"Stack failed: {stack.get('StackStatusReason', 'Unknown')}")
get_output_value(stack_name: str, export_name: str) str | None[source]#

Retrieve a specific output value from a CloudFormation stack by export name.

Searches through stack outputs to find the value associated with the specified export name. Returns None if the export name is not found. This is useful for retrieving exported resource identifiers (VPC IDs, security group IDs, etc.) from stacks.

Parameters:
  • stack_name – Name or ARN of the CloudFormation stack.

  • export_name – Export name of the output value to retrieve. This is the “ExportName” field in the stack outputs, not the “OutputKey”.

Returns:

Output value as string if found, None if export name doesn’t exist.

Raises:

AwsClientException – If stack doesn’t exist or operation fails.

Example

cfn = CloudFormationClient(region="us-east-1")

# Get specific output by export name
vpc_id = cfn.get_output_value(
    stack_name="network-stack",
    export_name="MyAppVpcId"
)

if vpc_id:
    print(f"VPC ID: {vpc_id}")
else:
    print("Export 'MyAppVpcId' not found in stack outputs")

# Get multiple outputs
exports_to_fetch = [
    "MyAppVpcId",
    "MyAppSubnetId",
    "MyAppSecurityGroupId"
]

for export in exports_to_fetch:
    value = cfn.get_output_value("network-stack", export)
    if value:
        print(f"{export}: {value}")
    else:
        print(f"{export}: Not found")

# Use output value in another operation
db_endpoint = cfn.get_output_value(
    stack_name="database-stack",
    export_name="DatabaseEndpoint"
)
if db_endpoint:
    # Use the endpoint in your application
    connection_string = f"postgresql://{db_endpoint}:5432/mydb"

Note

  • Export names must be unique within a region

  • This method searches by ExportName, not OutputKey

  • Returns None (not an error) if export name is not found

  • Automatically handles stacks without Outputs section

DynamoDB Client#

AWS DynamoDB client wrapper.

This module provides a high-level interface for interacting with AWS DynamoDB, including item operations like get_item and update_item with automatic error handling.

class core_aws.services.dynamo.client.DynamoDbClient(region: str, **kwargs: Any)[source]#

Bases: AwsClient

Client for AWS DynamoDB.

This client provides methods for interacting with DynamoDB tables, including retrieving and updating items. Supports both single-item operations and conditional updates with automatic error handling.

Example

# Initialize client
dynamodb = DynamoDbClient(region="us-east-1")

# Get item by primary key
response = dynamodb.get_item(
    table="Users",
    key={"userId": {"S": "user123"}}
)
if "Item" in response:
    print(f"User: {response['Item']}")

# Update item with conditional expression
dynamodb.update_item(
    table="Users",
    key={"userId": {"S": "user123"}},
    update_expression="SET #name = :name, lastLogin = :timestamp",
    expression_attribute_values={
        ":name": {"S": "John Doe"},
        ":timestamp": {"N": "1609459200"}
    },
    ExpressionAttributeNames={"#name": "name"}
)
client: mypy_boto3_dynamodb.client.DynamoDBClient#
__init__(region: str, **kwargs: Any) None[source]#

Initialize the DynamoDB client.

Parameters:
  • region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).

  • kwargs – Additional arguments passed to boto3.client().

get_item(table: str, key: Dict[str, Any], **kwargs: Any) Dict[str, Any][source]#

Retrieve an item from a DynamoDB table by primary key. Returns all attributes for the item with the specified primary key. If no matching item exists, the response will not contain an “Item” field. Uses eventually consistent reads by default.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/get_item.html

Parameters:
  • table – Name of the DynamoDB table.

  • key

    Primary key of the item to retrieve. Must include partition key and sort key (if table has one). Format:

    # Table with partition key only
    {"userId": {"S": "user123"}}
    
    # Table with partition + sort key
    {"userId": {"S": "user123"}, "timestamp": {"N": "1609459200"}}
    

  • kwargs

    Additional boto3 parameters:

    • ConsistentRead (bool):

      Use strongly consistent read instead of eventually consistent. Default: False.

    • ProjectionExpression (str):

      Comma-separated list of attributes to retrieve. Example: “username, email, lastLogin”

    • ExpressionAttributeNames (dict):

      Substitution tokens for attribute names in expressions. Example: {“#n”: “name”} (use when attribute is reserved word)

    • ReturnConsumedCapacity (str):

      Return capacity info: “INDEXES”, “TOTAL”, or “NONE”.

Returns:

Dictionary containing item data (if found) and metadata:

{
    "Item": {  # Present only if item exists
        "userId": {"S": "user123"},
        "username": {"S": "john_doe"},
        "email": {"S": "john@example.com"},
        "loginCount": {"N": "42"},
        "tags": {"SS": ["premium", "verified"]},
        "metadata": {
            "M": {
                "created": {"N": "1609459200"},
                "updated": {"N": "1640995200"}
            }
        },
        "active": {"BOOL": True}
    },
    "ConsumedCapacity": {  # If ReturnConsumedCapacity specified
        "TableName": "Users",
        "CapacityUnits": 0.5
    }
}

Raises:

AwsClientException – If the operation fails.

Example

dynamodb = DynamoDbClient(region="us-east-1")

# Get item by partition key
response = dynamodb.get_item(
    table="Users",
    key={"userId": {"S": "user123"}}
)

if "Item" in response:
    user = response["Item"]
    print(f"Username: {user['username']['S']}")
else:
    print("User not found")

# Get item with consistent read
response = dynamodb.get_item(
    table="Users",
    key={"userId": {"S": "user123"}},
    ConsistentRead=True
)

# Get specific attributes only
response = dynamodb.get_item(
    table="Users",
    key={"userId": {"S": "user123"}},
    ProjectionExpression="username, email, lastLogin"
)

# Get item from table with partition + sort key
response = dynamodb.get_item(
    table="OrderHistory",
    key={
        "userId": {"S": "user123"},
        "orderId": {"S": "order-456"}
    }
)

# Use attribute name substitution (when attribute is reserved word)
response = dynamodb.get_item(
    table="Users",
    key={"userId": {"S": "user123"}},
    ProjectionExpression="#n, email",
    ExpressionAttributeNames={"#n": "name"}
)
update_item(table: str, key: Dict[str, Any], expression_attribute_values: Dict[str, Any], update_expression: str, **kwargs: Any) Dict[str, Any][source]#

Update an existing item’s attributes in a DynamoDB table.

Modifies an existing item or creates a new item if it doesn’t exist. Supports SET, REMOVE, ADD, and DELETE operations on attributes. Can perform conditional updates and return old/new attribute values.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/update_item.html

Parameters:
  • table – Name of the DynamoDB table.

  • key – Primary key of the item to update (partition key + sort key if applicable). Example: {“userId”: {“S”: “user123”}}

  • expression_attribute_values – Values to substitute in the update expression. Must be prefixed with colon (:). Example: {“:name”: {“S”: “John”}, “:count”: {“N”: “1”}}

  • update_expression

    Expression defining how to update the item. Supports:
    • SET: Set attribute value

    • REMOVE: Remove attribute

    • ADD: Increment number or add to set

    • DELETE: Remove from set

    Example: “SET #name = :name, loginCount = loginCount + :count”

  • kwargs

    Additional boto3 parameters:

    • ConditionExpression (str):

      Condition that must be true for update to proceed. Example: “attribute_exists(userId)” or “version = :oldVersion”

    • ExpressionAttributeNames (dict):

      Substitution tokens for attribute names (use for reserved words). Example: {“#name”: “name”, “#status”: “status”}

    • ReturnValues (str):

      What values to return: “NONE” (default), “ALL_OLD”, “UPDATED_OLD”, “ALL_NEW”, “UPDATED_NEW”.

    • ReturnConsumedCapacity (str):

      Return capacity info: “INDEXES”, “TOTAL”, or “NONE”.

    • ReturnItemCollectionMetrics (str):

      Return collection metrics: “SIZE” or “NONE”.

Returns:

Dictionary containing updated attributes (based on ReturnValues):

{
    "Attributes": {  # Present based on ReturnValues
        "userId": {"S": "user123"},
        "username": {"S": "john_doe"},
        "loginCount": {"N": "43"},
        "lastLogin": {"N": "1640995200"}
    },
    "ConsumedCapacity": {  # If ReturnConsumedCapacity specified
        "TableName": "Users",
        "CapacityUnits": 1.0
    }
}

Raises:

AwsClientException – If the operation fails or condition is not met.

Example

dynamodb = DynamoDbClient(region="us-east-1")

# Simple SET operation
dynamodb.update_item(
    table="Users",
    key={"userId": {"S": "user123"}},
    update_expression="SET lastLogin = :timestamp",
    expression_attribute_values={
        ":timestamp": {"N": "1640995200"}
    }
)

# Multiple operations with attribute name substitution
dynamodb.update_item(
    table="Users",
    key={"userId": {"S": "user123"}},
    update_expression="SET #name = :name, email = :email, loginCount = loginCount + :inc",
    expression_attribute_values={
        ":name": {"S": "John Doe"},
        ":email": {"S": "john@example.com"},
        ":inc": {"N": "1"}
    },
    ExpressionAttributeNames={
        "#name": "name"  # 'name' is a reserved word
    }
)

# Conditional update with return values
response = dynamodb.update_item(
    table="Users",
    key={"userId": {"S": "user123"}},
    update_expression="SET accountBalance = accountBalance - :amount",
    expression_attribute_values={
        ":amount": {"N": "50"},
        ":min": {"N": "0"}
    },
    ConditionExpression="accountBalance >= :min",
    ReturnValues="ALL_NEW"
)
print(f"New balance: {response['Attributes']['accountBalance']['N']}")

# REMOVE operation
dynamodb.update_item(
    table="Users",
    key={"userId": {"S": "user123"}},
    update_expression="REMOVE temporaryToken",
    expression_attribute_values={}
)

# ADD to set
dynamodb.update_item(
    table="Users",
    key={"userId": {"S": "user123"}},
    update_expression="ADD tags :newTag",
    expression_attribute_values={
        ":newTag": {"SS": ["premium"]}
    }
)

# Complex update with multiple clauses
dynamodb.update_item(
    table="Users",
    key={"userId": {"S": "user123"}},
    update_expression="SET #status = :active, updatedAt = :now REMOVE oldField ADD loginCount :one",
    expression_attribute_values={
        ":active": {"S": "ACTIVE"},
        ":now": {"N": "1640995200"},
        ":one": {"N": "1"}
    },
    ExpressionAttributeNames={
        "#status": "status"
    },
    ReturnValues="UPDATED_NEW"
)

ECS Client#

AWS ECS (Elastic Container Service) client wrapper.

This module provides a high-level interface for interacting with AWS ECS, including service management, task operations, and container orchestration.

class core_aws.services.ecs.client.EcsClient(region: str, **kwargs: Any)[source]#

Bases: AwsClient

Client for AWS ECS (Elastic Container Service).

This client provides methods for managing ECS services, tasks, and container instances. Supports service updates, task listing, and service description operations with automatic error handling.

Example

# Initialize client
ecs = EcsClient(region="us-east-1")

# List services in a cluster
services = ecs.list_services(cluster="my-cluster")
print(f"Services: {services['serviceArns']}")

# Describe services
details = ecs.describe_services(
    cluster="my-cluster",
    services=["my-service"]
)

# Update service desired count
ecs.update_service(
    service="my-service",
    cluster="my-cluster",
    desiredCount=3
)

# List running tasks
tasks = ecs.list_tasks(
    cluster="my-cluster",
    serviceName="my-service"
)
client: mypy_boto3_ecs.client.ECSClient#
__init__(region: str, **kwargs: Any) None[source]#

Initialize the ECS client.

Parameters:
  • region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).

  • kwargs – Additional arguments passed to boto3.client().

list_services(cluster: str, **kwargs: Any) Dict[str, Any][source]#

List services in an ECS cluster. Returns a list of service ARNs in the specified cluster. Results can be filtered by launch type and scheduling strategy. Supports pagination for clusters with many services.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs/client/list_services.html

Parameters:
  • cluster – Short name or full ARN of the cluster. If not specified, uses the default cluster.

  • kwargs

    Additional boto3 parameters:

    • nextToken (str):

      Token for pagination. Use the value from a previous response to get the next page of results.

    • maxResults (int):

      Maximum number of results per page (1-100). Default: 10.

    • launchType (str):

      Filter by launch type: ‘EC2’, ‘FARGATE’, or ‘EXTERNAL’.

    • schedulingStrategy (str):

      Filter by scheduling strategy: ‘REPLICA’ or ‘DAEMON’.

Returns:

Dictionary containing service ARNs and pagination info:

{
    "serviceArns": [
        "arn:aws:ecs:us-west-2:123456789012:service/my-cluster/my-service"
    ],
    "nextToken": "string",  # Present if more results available
    "ResponseMetadata": {...}
}

Raises:

AwsClientException – If the operation fails.

Example

ecs = EcsClient(region="us-east-1")

# List all services in cluster
services = ecs.list_services(cluster="my-cluster")
print(f"Found {len(services['serviceArns'])} services")

# List Fargate services with pagination
services = ecs.list_services(
    cluster="my-cluster",
    launchType="FARGATE",
    maxResults=50
)

# Handle pagination
all_service_arns = []
response = ecs.list_services(cluster="my-cluster")
all_service_arns.extend(response["serviceArns"])

while "nextToken" in response:
    response = ecs.list_services(
        cluster="my-cluster",
        nextToken=response["nextToken"]
    )
    all_service_arns.extend(response["serviceArns"])
describe_services(cluster: str, services: List[str], **kwargs: Any) Dict[str, Any][source]#

Retrieve detailed information about ECS services. Returns comprehensive information about specified services including configuration, deployment status, task definition, load balancers, service registries, and more. Up to 10 services can be described in a single operation.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs/client/describe_services.html

Parameters:
  • cluster – Short name or full ARN of the cluster hosting the services. Required if services are in non-default cluster.

  • services – List of service names or ARNs to describe (max 10 per request). Example: [“my-service”, “another-service”]

  • kwargs

    Additional boto3 parameters:

    • include (list):

      Additional information to include in response. Options: [‘TAGS’] to include resource tags.

Returns:

Dictionary containing service details:

{
    "services": [
        {
            "serviceName": "my-service",
            "serviceArn": "arn:aws:ecs:...",
            "clusterArn": "arn:aws:ecs:...",
            "status": "ACTIVE",
            "desiredCount": 2,
            "runningCount": 2,
            "pendingCount": 0,
            "launchType": "FARGATE",
            "taskDefinition": "arn:aws:ecs:...",
            "deployments": [...],
            "events": [...],
            "loadBalancers": [...],
            "networkConfiguration": {...},
            "tags": [...]  # If include=['TAGS']
        }
    ],
    "failures": [
        {
            "arn": "arn:aws:ecs:...",
            "reason": "MISSING"
        }
    ]
}

Raises:

AwsClientException – If the operation fails.

Example

ecs = EcsClient(region="us-east-1")

# Describe single service
details = ecs.describe_services(
    cluster="my-cluster",
    services=["my-service"]
)

service = details["services"][0]
print(f"Service: {service['serviceName']}")
print(f"Running: {service['runningCount']}/{service['desiredCount']}")
print(f"Status: {service['status']}")

# Describe multiple services with tags
details = ecs.describe_services(
    cluster="my-cluster",
    services=["service-1", "service-2", "service-3"],
    include=["TAGS"]
)

for service in details["services"]:
    print(f"{service['serviceName']}: {service['taskDefinition']}")
    for tag in service.get("tags", []):
        print(f"  {tag['key']}: {tag['value']}")

# Check for failures
if details["failures"]:
    for failure in details["failures"]:
        print(f"Failed: {failure['arn']} - {failure['reason']}")
update_service(service: str, **kwargs: Any) Dict[str, Any][source]#

Update an ECS service configuration. Modifies service parameters including desired count, task definition, deployment configuration, network configuration, and task placement strategies. For services using rolling update (ECS) deployment controller.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs/client/update_service.html

Parameters:
  • service – Name or full ARN of the service to update.

  • kwargs

    Additional boto3 parameters (commonly used):

    • cluster (str):

      Cluster name or ARN. Default: default cluster.

    • desiredCount (int):

      Number of task instantiations to place and keep running.

    • taskDefinition (str):

      Task definition family and revision (family:revision) or full ARN.

    • deploymentConfiguration (dict):
      Deployment parameters:
      • minimumHealthyPercent (int): Lower limit (0-100)

      • maximumPercent (int): Upper limit (100-200)

      • deploymentCircuitBreaker (dict): Circuit breaker config

    • networkConfiguration (dict):

      Network configuration for FARGATE launch type.

    • platformVersion (str):

      Platform version for Fargate tasks (e.g., “LATEST”, “1.4.0”).

    • forceNewDeployment (bool):

      Force new deployment even if no changes.

    • healthCheckGracePeriodSeconds (int):

      Grace period for load balancer health checks (0-2147483647).

    • enableExecuteCommand (bool):

      Enable ECS Exec for debugging.

    • capacityProviderStrategy (list):

      Capacity provider strategy to use.

    • placementConstraints (list):

      Task placement constraints.

    • placementStrategy (list):

      Task placement strategies.

Returns:

Dictionary containing updated service information:

{
    "service": {
        "serviceName": "my-service",
        "serviceArn": "arn:aws:ecs:...",
        "taskDefinition": "arn:aws:ecs:...",
        "desiredCount": 3,
        "runningCount": 2,
        "pendingCount": 1,
        "deployments": [
            {
                "status": "PRIMARY",
                "taskDefinition": "arn:aws:ecs:...",
                "desiredCount": 3,
                "runningCount": 2
            }
        ],
        "events": [...]
    }
}

Raises:

AwsClientException – If the operation fails.

Example

ecs = EcsClient(region="us-east-1")

# Update desired count
result = ecs.update_service(
    service="my-service",
    cluster="my-cluster",
    desiredCount=5
)
print(f"Updated to {result['service']['desiredCount']} tasks")

# Update task definition
ecs.update_service(
    service="my-service",
    cluster="my-cluster",
    taskDefinition="my-task:2"
)

# Force new deployment with deployment configuration
ecs.update_service(
    service="my-service",
    cluster="my-cluster",
    forceNewDeployment=True,
    deploymentConfiguration={
        "minimumHealthyPercent": 50,
        "maximumPercent": 200,
        "deploymentCircuitBreaker": {
            "enable": True,
            "rollback": True
        }
    }
)

# Enable ECS Exec for debugging
ecs.update_service(
    service="my-service",
    cluster="my-cluster",
    enableExecuteCommand=True
)
list_tasks(**kwargs: Any) Dict[str, Any][source]#

List tasks in an ECS cluster. Returns a list of task ARNs. Filter by cluster, task definition family, container instance, launch type, starter principal, or desired status. Recently stopped tasks appear in results for at least one hour.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs/client/list_tasks.html

Parameters:

kwargs

Boto3 parameters for filtering:

  • cluster (str):

    Cluster name or ARN. Default: default cluster.

  • containerInstance (str):

    Container instance ID or ARN to filter by.

  • family (str):

    Task definition family name to filter by.

  • serviceName (str):

    Service name to filter tasks belonging to that service.

  • desiredStatus (str):

    Filter by task status: ‘RUNNING’ (default) or ‘STOPPED’. Use ‘STOPPED’ for debugging failed/finished tasks.

  • launchType (str):

    Filter by launch type: ‘EC2’, ‘FARGATE’, or ‘EXTERNAL’.

  • startedBy (str):

    Filter by principal that started the task.

  • nextToken (str):

    Pagination token from previous response.

  • maxResults (int):

    Maximum results per page (1-100). Default: 100.

Returns:

Dictionary containing task ARNs and pagination info:

{
    "taskArns": [
        "arn:aws:ecs:us-west-2:123456789012:task/my-cluster/abc123",
        "arn:aws:ecs:us-west-2:123456789012:task/my-cluster/def456"
    ],
    "nextToken": "string",  # Present if more results available
    "ResponseMetadata": {...}
}

Raises:

AwsClientException – If the operation fails.

Example

ecs = EcsClient(region="us-east-1")

# List all running tasks in cluster
tasks = ecs.list_tasks(cluster="my-cluster")
print(f"Running tasks: {len(tasks['taskArns'])}")

# List tasks for specific service
service_tasks = ecs.list_tasks(
    cluster="my-cluster",
    serviceName="my-service"
)

# List stopped tasks (for debugging)
stopped_tasks = ecs.list_tasks(
    cluster="my-cluster",
    desiredStatus="STOPPED",
    maxResults=50
)

# List tasks by task definition family
family_tasks = ecs.list_tasks(
    cluster="my-cluster",
    family="my-task-family"
)

# List Fargate tasks started by specific principal
fargate_tasks = ecs.list_tasks(
    cluster="my-cluster",
    launchType="FARGATE",
    startedBy="arn:aws:iam::123456789012:user/admin"
)

# Handle pagination
all_task_arns = []
response = ecs.list_tasks(cluster="my-cluster")
all_task_arns.extend(response["taskArns"])

while "nextToken" in response:
    response = ecs.list_tasks(
        cluster="my-cluster",
        nextToken=response["nextToken"]
    )
    all_task_arns.extend(response["taskArns"])

print(f"Total tasks: {len(all_task_arns)}")

Kinesis Client#

AWS Kinesis Data Streams client wrapper.

This module provides a high-level interface for interacting with AWS Kinesis, including single and batch record operations with automatic retry logic.

class core_aws.services.kinesis.client.KinesisClient(region: str, **kwargs: Any)[source]#

Bases: AwsClient

Client for AWS Kinesis Data Streams.

This client provides methods for writing records to Kinesis data streams, supporting both single and batch operations. Includes intelligent retry logic for handling transient failures and throughput exceeded errors.

Example

# Initialize client
kinesis = KinesisClient(region="us-east-1")

# Put single record
response = kinesis.put_record(
    stream_name="my-stream",
    data=b'{"event": "user_signup", "user_id": 123}',
    partition_key="user-123"
)
print(f"Shard: {response['ShardId']}")

# Put multiple records with retry
records = [
    {"event": "login", "user_id": 1},
    {"event": "purchase", "user_id": 2}
]
kinesis.send_records(
    records=records,
    stream_name="my-stream",
    partition_key="events"
)
client: mypy_boto3_kinesis.client.KinesisClient#
__init__(region: str, **kwargs: Any) None[source]#

Initialize the Kinesis client.

Parameters:
  • region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).

  • kwargs – Additional arguments passed to boto3.client().

create_stream(stream_name: str, shard_count: int | None = None, stream_mode_details: Dict[str, str] | None = None, **kwargs: Any) Dict[str, Any][source]#

Create a new Kinesis data stream with specified capacity. Creates either a provisioned stream (with explicit shard count) or an on-demand stream (auto-scaling). Stream becomes ACTIVE within seconds and can start accepting data immediately.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/create_stream.html

Parameters:
  • stream_name – Name of the stream to create. Must be: - 1-128 characters long - Alphanumeric, hyphens, underscores, and periods only - Cannot use “aws:” prefix (reserved) - Unique within the AWS account and region

  • shard_count

    Number of shards for provisioned capacity mode. Each shard provides: - Write: 1 MB/sec or 1,000 records/sec - Read: 2 MB/sec or 5 read transactions/sec

    Required for provisioned mode, must be None for on-demand mode. Can be increased/decreased later using UpdateShardCount API.

  • stream_mode_details

    Stream capacity mode configuration. Dict with single key:

    • StreamMode (str): Capacity mode, either:
      • ”PROVISIONED”: Fixed shard count (requires shard_count parameter)

      • ”ON_DEMAND”: Auto-scaling capacity (shard_count must be None)

    Default behavior if omitted:
    • With shard_count: Creates provisioned stream

    • Without shard_count: Creates on-demand stream

  • kwargs

    Additional boto3 parameters:

    • Tags (dict): Resource tags for the stream. Dictionary of key-value pairs: {“Environment”: “Production”, “Application”: “Analytics”}

Returns:

Empty dictionary on success. Stream creation is asynchronous. Use describe_stream() or wait_until_active() to check status.

Raises:

AwsClientException – If stream creation fails (e.g., name already exists, invalid parameters, resource limits exceeded).

Example

kinesis = KinesisClient(region="us-east-1")

# Create provisioned stream with 2 shards
kinesis.create_stream(
    stream_name="my-stream",
    shard_count=2
)

# Create on-demand stream (auto-scaling)
kinesis.create_stream(
    stream_name="my-on-demand-stream",
    stream_mode_details={"StreamMode": "ON_DEMAND"}
)

# Create stream with explicit mode specification
kinesis.create_stream(
    stream_name="my-provisioned-stream",
    shard_count=5,
    stream_mode_details={"StreamMode": "PROVISIONED"}
)

# Create stream with tags
kinesis.create_stream(
    stream_name="tagged-stream",
    shard_count=1,
    Tags={
        "Environment": "Production",
        "Application": "Analytics",
        "CostCenter": "Engineering"
    }
)

# Wait for stream to become active
waiter = kinesis.client.get_waiter('stream_exists')
waiter.wait(StreamName="my-stream")

print("Stream is now active and ready to use")

Note

  • Stream creation is asynchronous (completes in seconds)

  • Provisioned mode: Fixed cost based on shard hours

  • On-demand mode: Pay per GB ingested/retrieved, auto-scales

  • Default retention: 24 hours (configurable up to 365 days)

  • Maximum 50 streams per region by default (soft limit)

  • Stream name must be unique within account and region

put_record(stream_name: str, data: bytes, partition_key: str, **kwargs: Any) Dict[str, Any][source]#

Write a single data record to a Kinesis data stream. Sends one record at a time for real-time ingestion. Each shard supports up to 1,000 records/second with a maximum write throughput of 1 MiB/second.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_record.html

Parameters:
  • stream_name – Name of the Kinesis data stream.

  • data – Data payload (bytes). Base64-encoded when serialized. Maximum size: 1 MiB (including partition key).

  • partition_key – Key determining which shard receives the record. Records with the same partition key go to the same shard.

  • kwargs

    Additional boto3 parameters:

    • ExplicitHashKey (str): Hash value to explicitly determine target shard, overriding partition key hash.

    • SequenceNumberForOrdering (str): Guarantees strictly increasing sequence numbers from same client/partition key.

Returns:

Dictionary containing shard placement information:

{
    "ShardId": "shardId-000000000001",
    "SequenceNumber": "49590338271490256608559692538361571095921575989136588898",
    "EncryptionType": "NONE" | "KMS"
}

Raises:

AwsClientException – If record write fails.

Example

kinesis = KinesisClient(region="us-east-1")

# Put single record
response = kinesis.put_record(
    stream_name="clickstream",
    data=b'{"event": "page_view", "page": "/home"}',
    partition_key="user-123"
)
print(f"Written to {response['ShardId']}")

# With explicit hash key
kinesis.put_record(
    stream_name="orders",
    data=b'{"order_id": "12345"}',
    partition_key="order-12345",
    ExplicitHashKey="123456789"
)
put_records(stream_name: str, records: List[Dict[str, Any]]) Dict[str, Any][source]#

Write multiple data records to a Kinesis data stream in a single request. Batch operation for writing up to 500 records at once. More efficient than multiple put_record() calls. Each record can be up to 1 MiB, with a total request limit of 5 MiB including partition keys. Each shard supports up to 1,000 records/second with 1 MiB/second throughput.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_records.html

Parameters:
  • stream_name – Name of the Kinesis data stream.

  • records

    List of record dictionaries (max 500). Each record structure:

    {
        "Data": b"bytes",  # Required: The data blob (base64-encoded)
        "PartitionKey": "string",  # Required: Determines shard
        "ExplicitHashKey": "string"  # Optional: Override partition key hash
    }
    

Returns:

Dictionary containing batch operation results:

{
    "FailedRecordCount": 0,  # Number of failed records
    "Records": [
        {
            "SequenceNumber": "string",
            "ShardId": "string",
            "ErrorCode": "string",  # Present if failed
            "ErrorMessage": "string"  # Present if failed
        },
    ],
    "EncryptionType": "NONE" | "KMS"
}

Raises:

AwsClientException – If the batch write operation fails.

Example

kinesis = KinesisClient(region="us-east-1")

# Prepare batch records
records = [
    {
        "Data": b'{"event": "login", "user_id": 1}',
        "PartitionKey": "user-1"
    },
    {
        "Data": b'{"event": "purchase", "user_id": 2}',
        "PartitionKey": "user-2"
    }
]

# Put records
response = kinesis.put_records(
    stream_name="events",
    records=records
)

# Check for failures
if response["FailedRecordCount"] > 0:
    for i, record in enumerate(response["Records"]):
        if "ErrorCode" in record:
            print(f"Record {i} failed: {record['ErrorMessage']}")

Note

For automatic retry logic with failed records, use send_records() instead, which handles transient failures and throughput exceeded errors.

send_records(records: List[Dict[str, Any]], stream_name: str, partition_key: str, records_per_request: int = 500, max_attempts: int = 10, interval_between_attempt: int = 1) None[source]#

Send records to Kinesis with automatic retry logic. High-level wrapper around put_records() that automatically handles transient failures and throughput exceeded errors. Converts Python dictionaries to JSON, batches records, and retries failed records with exponential backoff.

Parameters:
  • records – List of record dictionaries to send. Each dict will be JSON-serialized. Example: [{“event”: “login”, “user_id”: 123}, …]

  • stream_name – Name of the target Kinesis data stream.

  • partition_key – Partition key for all records. All records will be sent to the same shard based on this key.

  • records_per_request – Maximum number of records per batch request. Default: 500 (AWS maximum).

  • max_attempts – Maximum retry attempts for failed records. Default: 10.

  • interval_between_attempt – Base delay in seconds between retry attempts. Uses exponential backoff (delay = interval * attempt_number). Default: 1 second.

Raises:

AwsClientException – If records still fail after max_attempts retries.

Example

kinesis = KinesisClient(region="us-east-1")

# Send event records with automatic retry
events = [
    {"event": "login", "user_id": 123, "timestamp": "2024-01-01T10:00:00Z"},
    {"event": "purchase", "user_id": 456, "amount": 99.99},
    {"event": "logout", "user_id": 123}
]

kinesis.send_records(
    records=events,
    stream_name="user-events",
    partition_key="events"
)

# With custom retry settings
kinesis.send_records(
    records=events,
    stream_name="critical-events",
    partition_key="events",
    records_per_request=100,  # Smaller batches
    max_attempts=20,  # More retries
    interval_between_attempt=2  # Longer delays
)

Note

  • Records are automatically JSON-serialized using json.dumps()

  • Failed records are automatically retried with exponential backoff

  • Delay between retries: 1s, 2s, 3s, 4s, … (linear backoff: interval * attempt)

  • All records use the same partition key (same shard)

_send_to_kinesis_stream(records: List[Dict[str, Any]], stream_name: str, records_per_request: int = 500, max_attempts: int = 10, interval_between_attempt: int = 1) None[source]#

Internal implementation for sending records with retry logic.

Handles batching, retry logic with exponential backoff, and error tracking. Called by send_records() to perform the actual data transfer. This method splits large record sets into batches and retries failed records automatically.

Parameters:
  • records

    List of formatted record dictionaries ready for Kinesis API.

    [{
        "Data": b"bytes",  # JSON-serialized data (bytes or string)
        "PartitionKey": "string",  # Required
        "ExplicitHashKey": "string"  # Optional
    }]
    

  • stream_name – Name of the Kinesis data stream.

  • records_per_request – Number of records per batch (max 500). Default: 500.

  • max_attempts – Maximum retry attempts for failed records. Default: 10.

  • interval_between_attempt – Base interval in seconds for exponential backoff. Default: 1.

Raises:

AwsClientException – If any records still fail after max_attempts retries.

Algorithm:

  1. Split records into batches of records_per_request size

  2. For each batch:

    1. Send to Kinesis using put_records()

    2. Check for failed records in response

    3. If failures exist and attempts remain:

      • Wait: interval * attempt_number seconds

      • Extract only failed records

      • Retry the failed records

    4. Repeat until success or max_attempts reached

  3. Raise exception if any records still failed

Note

  • This is an internal method. Use send_records() instead.

  • Uses linear backoff: 1s, 2s, 3s, 4s, 5s, …

  • Only retries records that actually failed (not entire batch)

Lambdas#

The following links contain information related to resources for Lambda Functions…

S3 Client#

AWS S3 (Simple Storage Service) client wrapper.

This module provides a high-level interface for interacting with AWS S3, including bucket operations, object upload/download, and batch operations.

class core_aws.services.s3.client.S3ACL(*values)[source]#

Bases: StrEnum

Enum for S3 canned ACL (Access Control List) values.

These are predefined access control lists that define common access patterns for S3 buckets and objects. AWS recommends using bucket policies and IAM policies instead of ACLs for most access control needs.

Reference:

https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl

PRIVATE = 'private'#

Owner gets FULL_CONTROL. No one else has access rights.

PUBLIC_READ = 'public-read'#

Owner gets FULL_CONTROL. AllUsers group gets READ access.

PUBLIC_READ_WRITE = 'public-read-write'#

Owner gets FULL_CONTROL. AllUsers group gets READ and WRITE access.

AUTHENTICATED_READ = 'authenticated-read'#

Owner gets FULL_CONTROL. AuthenticatedUsers group gets READ access.

static _generate_next_value_(name, start, count, last_values)#

Return the lower-cased version of the member name.

class core_aws.services.s3.client.S3ObjectOwnership(*values)[source]#

Bases: StrEnum

Enum for S3 object ownership settings.

Controls object ownership and whether ACLs are enabled for the bucket. This setting applies to all objects in the bucket.

Reference:

https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html

BUCKET_OWNER_ENFORCED = 'BucketOwnerEnforced'#

ACLs are disabled. Bucket owner automatically owns and has full control over all objects in the bucket. All access control must be done via bucket policies and IAM policies.

BUCKET_OWNER_PREFERRED = 'BucketOwnerPreferred'#

Bucket owner owns objects if they are uploaded with the bucket-owner-full-control canned ACL. Otherwise, object writer retains ownership.

OBJECT_WRITER = 'ObjectWriter'#

Object uploader retains ownership. The AWS account that uploads an object owns the object, has full control over it, and can grant other users access to it through ACLs.

static _generate_next_value_(name, start, count, last_values)#

Return the lower-cased version of the member name.

class core_aws.services.s3.client.S3Client(signature_version: str = 's3v4', **kwargs: Any)[source]#

Bases: AwsClient

Client for AWS S3 (Simple Storage Service).

This client provides methods for managing S3 buckets and objects, including upload, download, copy, delete, and list operations. Supports both single-part and multipart operations with automatic pagination handling.

Example

# Initialize client
s3 = S3Client(region_name="us-east-1")

# Check if bucket exists
if s3.does_bucket_exist("my-bucket"):
    print("Bucket exists")

# Upload a file
s3.upload_file(
    bucket="my-bucket",
    key="data/file.csv",
    path="/local/path/file.csv"
)

# List all objects
for obj in s3.list_all_objects("my-bucket", "data/"):
    print(f"{obj['Key']}: {obj['Size']} bytes")

# Download a file
s3.download_file(
    bucket="my-bucket",
    key="data/file.csv",
    local_path="/local/download/file.csv"
)
client: mypy_boto3_s3.client.S3Client#
__init__(signature_version: str = 's3v4', **kwargs: Any) None[source]#

Initialize the S3 client.

Parameters:
  • signature_version – S3 signature version. Default: “s3v4” (recommended). Options: “s3v4” (AWS Signature Version 4), “s3” (legacy).

  • kwargs

    Additional arguments passed to boto3.client():
    • region_name: AWS region (e.g., ‘us-east-1’)

    • endpoint_url: Custom endpoint (for LocalStack, MinIO, etc.)

    • aws_access_key_id, aws_secret_access_key, aws_session_token

Example

# Standard S3
s3 = S3Client(region_name="us-east-1")

# With custom endpoint (LocalStack)
s3_local = S3Client(
    endpoint_url="http://localhost:4566",
    region_name="us-east-1"
)

# Legacy signature version
s3_legacy = S3Client(signature_version="s3")
create_bucket(bucket: str, acl: S3ACL | str | None = None, object_ownership: S3ObjectOwnership | str | None = None, **kwargs: Any) Dict[str, Any][source]#

Create a new S3 bucket in the specified region. Bucket names must be globally unique across all AWS accounts. For regions outside us-east-1, a LocationConstraint must be specified.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/create_bucket.html

Parameters:
  • bucket – Name of the bucket to create. Must be: - 3-63 characters long - Lowercase letters, numbers, hyphens, and periods only - Must start and end with a letter or number - Must not be formatted as an IP address - Globally unique across all AWS accounts

  • acl

    Optional canned ACL to apply to the bucket. Options:

    • private (default): Owner gets FULL_CONTROL. No one else has access.

    • public-read: Owner gets FULL_CONTROL. AllUsers get READ access.

    • public-read-write: Owner gets FULL_CONTROL. AllUsers get READ and WRITE access.

    • authenticated-read: Owner gets FULL_CONTROL. AuthenticatedUsers get READ access.

    Note: AWS recommends using bucket policies instead of ACLs for access control.

  • object_ownership

    Optional object ownership setting. Controls object ownership and ACL behavior:

    • BucketOwnerEnforced: ACLs disabled, bucket owner owns all objects

    • BucketOwnerPreferred: Bucket owner owns objects if uploaded with bucket-owner-full-control ACL

    • ObjectWriter: Object uploader retains ownership (default legacy behavior)

  • kwargs

    Additional boto3 parameters:

    • CreateBucketConfiguration (dict): Bucket configuration with:
      • LocationConstraint (str): AWS region (e.g., ‘us-west-2’). Required for all regions except us-east-1.

    • GrantFullControl (str): Grants READ, WRITE, READ_ACP, and WRITE_ACP permissions

    • GrantRead (str): Grants READ permission

    • GrantReadACP (str): Grants READ_ACP permission

    • GrantWrite (str): Grants WRITE permission

    • GrantWriteACP (str): Grants WRITE_ACP permission

    • ObjectLockEnabledForBucket (bool): Enable object lock (requires versioning)

Returns:

Dictionary containing bucket creation response:

{
    "Location": "string"  # URI of the created bucket
}

Raises:

AwsClientException – If bucket creation fails (e.g., name already exists, invalid name, permission denied).

Example

from core_aws.services.s3.client import S3Client, S3ACL, S3ObjectOwnership

s3 = S3Client(region_name="us-east-1")

# Create bucket in us-east-1 (default region)
response = s3.create_bucket(bucket="my-unique-bucket-name")
print(f"Created bucket at: {response['Location']}")

# Create bucket in a specific region
s3_west = S3Client(region_name="us-west-2")
response = s3_west.create_bucket(
    bucket="my-west-bucket",
    CreateBucketConfiguration={
        "LocationConstraint": "us-west-2"
    }
)

# Create bucket with object ownership controls (using enum)
response = s3.create_bucket(
    bucket="my-controlled-bucket",
    object_ownership=S3ObjectOwnership.BUCKET_OWNER_ENFORCED
)

# Create private bucket with explicit ACL (using enum)
response = s3.create_bucket(
    bucket="my-private-bucket",
    acl=S3ACL.PRIVATE
)

# Create public-read bucket (string also works)
response = s3.create_bucket(
    bucket="my-public-bucket",
    acl="public-read"
)

# Create bucket with object lock enabled
response = s3.create_bucket(
    bucket="my-locked-bucket",
    ObjectLockEnabledForBucket=True
)

Note

  • Bucket names must be globally unique across all AWS accounts

  • For regions other than us-east-1, must specify CreateBucketConfiguration

  • Bucket creation is eventually consistent

  • Maximum 100 buckets per AWS account by default

does_bucket_exist(bucket: str) bool[source]#

Check if an S3 bucket exists and is accessible. Performs a lightweight list operation to verify bucket existence and access permissions.

Parameters:

bucket – Name of the S3 bucket to check.

Returns:

True if bucket exists and is accessible, False if bucket doesn’t exist.

Raises:

AwsClientException – If access is denied or other errors occur.

Example

s3 = S3Client(region_name="us-east-1")

if s3.does_bucket_exist("my-bucket"):
    print("Bucket exists and is accessible")
else:
    print("Bucket does not exist")
list_objects(bucket: str, prefix: str = '', **kwargs: Any) Dict[str, Any][source]#

List objects in an S3 bucket (single page, up to 1000 objects). Returns a single page of results. For listing all objects with automatic pagination, use list_all_objects() instead.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.list_objects

Parameters:
  • bucket – Name of the S3 bucket.

  • prefix – Object key prefix filter (e.g., “folder/subfolder/”).

  • kwargs – Additional boto3 parameters (Delimiter, MaxKeys, Marker, etc.).

Returns:

Dictionary containing object listing with structure:

{
    "IsTruncated": True|False,
    "Marker": "string",
    "NextMarker": "string",
    "Contents": [
        {
            "Key": "string",
            "LastModified": datetime(2015, 1, 1),
            "ETag": "string",
            "ChecksumAlgorithm": [
                "CRC32"|"CRC32C"|"SHA1"|"SHA256",
            ],
            "Size": 123,
            "StorageClass": "STANDARD" | ... | "GLACIER" ",
            "Owner": {
                "DisplayName": "string",
                "ID": "string"
            },
            "RestoreStatus": {
                "IsRestoreInProgress": True|False,
                "RestoreExpiryDate": datetime(2015, 1, 1)
            }
        },
    ],
    "Name": "string",
    "Prefix": "string",
    "Delimiter": "string",
    "MaxKeys": 123,
    "CommonPrefixes": [
        {
            "Prefix": "string"
        },
    ],
    "EncodingType": "url",
    "RequestCharged": "requester"
}

list_all_objects(bucket: str, prefix: str = '', **kwargs: Any) Iterator[Dict[str, Any]][source]#

Retrieve information of all objects (files) into s3 bucket. This way you don”t need to worry about pagination…

Parameters:
  • bucket – Bucket name.

  • prefix – Objects prefix.

  • kwargs

Returns:

An iterator that contains dictionaries with the following structure

{
    "Key": "string",
    "LastModified": datetime(2015, 1, 1),
    "ETag": "string",
    "ChecksumAlgorithm": [
        "CRC32"|"CRC32C"|"SHA1"|"SHA256",
    ],
    "Size": 123,
    "StorageClass": "STANDARD" | ... | "GLACIER" ",
    "Owner": {
        "DisplayName": "string",
        "ID": "string"
    },
    "RestoreStatus": {
        "IsRestoreInProgress": True|False,
        "RestoreExpiryDate": datetime(2015, 1, 1)
    }
}

upload_file(bucket: str, key: str, path: str, **kwargs: Any) None[source]#

Upload a file to AWS bucket… https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/upload_file.html

Parameters:
  • bucket – The bucket name of the bucket containing the object.

  • key – Key/path of the object.

  • path – (str) – The path to the file to upload.

  • kwargs

Returns:

upload_object(bucket: str, key: str, data: BytesIO, **kwargs) None[source]#

Upload a file-like object (BytesIO) to AWS bucket… https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.upload_fileobj

Parameters:
  • bucket – The bucket name of the bucket containing the object.

  • key – Key/path of the object.

  • data – A file-like object to upload. At a minimum, it must implement the read method, and must return bytes.

  • kwargs

download_file(bucket: str, key: str, local_path: str, **kwargs) str[source]#

Download file from the S3 bucket to the local path… https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/download_file.html

Parameters:
  • local_path – Path to save the file locally.

  • bucket – The bucket name of the bucket containing the object.

  • key – Key/path of the object.

  • kwargs

Returns:

The local file where the file was stored.

download_object(buffer: BytesIO, bucket: str, key: str, **kwargs) None[source]#

Download an object from S3 to a file-like object. The file-like object must be in binary mode…

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/download_fileobj.html

Parameters:
  • bucket – The bucket name of the bucket containing the object.

  • key – Key/path of the object.

  • buffer – (a file-like object) – A file-like object to download into. At a minimum, it must implement the write method and must accept bytes.

  • kwargs

copy(copy_source: Dict, bucket: str, key: str, **kwargs) None[source]#

Copy an object from one S3 location to another. This is a managed transfer which will perform a multipart copy in multiple threads if necessary…

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.copy

Parameters:
  • copy_source (dict) – The name of the source bucket, key name of the source object, and optional version ID of the source object. The dictionary format is: {"Bucket": "bucket", "Key": "key", "VersionId": "id"}. Note that the VersionId key is optional and may be omitted.

  • bucket (str) – The name of the bucket to copy to

  • key (str) – The name of the key to copy to

  • kwargs

    Extra arguments. ExtraArgs: dict – Extra arguments that may be passed to the client operation

    Callback: function

    A method which takes a number of bytes transferred to be periodically called during the copy.

    SourceClient: botocore or boto3 Client

    The client to be used for operation that may happen at the source object. For example, this client is used for the head_object that determines the size of the copy. If no client is provided, the current client is used as the client for the source object.

    ConfigLoader: boto3.s3.transfer.TransferConfig

    The transfer configuration to be used when performing the copy.

delete_object(bucket: str, key: str, **kwargs) Dict[source]#

Delete objects from a bucket using a single request… https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/delete_object.html

Parameters:
  • bucket – The bucket name of the bucket containing the object.

  • key – Key name of the object to delete.

  • kwargs

Returns:

{
    "DeleteMarker": True|False,
    "VersionId": "string",
    "RequestCharged": "requester"
}

delete_objects(bucket: str, objects: List[Dict], **kwargs) Dict[source]#

Delete objects from a bucket using a single request… https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/delete_objects.html

Parameters:
  • bucket

  • objects

    The objects to delete… Object Identifier is unique value to identify objects. Key (string) [REQUIRED] -> Key name of the object to delete. VersionId (string) -> VersionId for the specific version of the object to delete.

    Example…

    [{
        "Key": "some_file.csv",
        "VersionId": "6LGg7gQLhY41.maGB5Z6SWW.dcq0vx7b",
    }]
    

  • kwargs

Returns:

{
    "Deleted": [
        {
            "Key": "string",
            "VersionId": "string",
            "DeleteMarker": True|False,
            "DeleteMarkerVersionId": "string"
        },
    ],
    "RequestCharged": "requester",
    "Errors": [
        {
            "Key": "string",
            "VersionId": "string",
            "Code": "string",
            "Message": "string"
        },
    ]
}

SNS Client#

AWS Simple Notification Service (SNS) client wrapper.

This module provides a high-level interface for interacting with AWS SNS, including message publishing to topics, batch operations, and SMS delivery.

class core_aws.services.sns.client.SnsMessage(Message: Dict | str, Id: str | None = None, Subject: str | None = None, MessageStructure: str | None = None, MessageAttributes: Dict | None = None, MessageDeduplicationId: str | None = None, MessageGroupId: str | None = None)[source]#

Bases: object

Represents an SNS message to be published to a topic, phone number, or endpoint.

This class encapsulates all parameters needed to publish a message via SNS, supporting both simple string messages and structured JSON messages for multiprotocol delivery.

Example

# Simple text message
msg = SnsMessage(Message="Hello, SNS!")

# Structured message with subject
msg = SnsMessage(
    Message="Important notification",
    Subject="Alert",
    Id="msg-001"
)

# Dict message (auto-converted to JSON)
msg = SnsMessage(
    Message={"user_id": 123, "event": "signup"},
    Id="msg-002"
)

# FIFO topic message
msg = SnsMessage(
    Message="Order processed",
    MessageGroupId="orders",
    MessageDeduplicationId="order-12345"
)
__init__(Message: Dict | str, Id: str | None = None, Subject: str | None = None, MessageStructure: str | None = None, MessageAttributes: Dict | None = None, MessageDeduplicationId: str | None = None, MessageGroupId: str | None = None) None[source]#

Initialize an SNS message.

Parameters:
  • Message

    The message content to send. Can be either a string or dict.

    • String: Sent as-is to all transport protocols.

    • Dict: Automatically converted to JSON structure with “default” key.

    Constraints:
    • Except for SMS: UTF-8 strings, max 256 KB (262,144 bytes).

    • SMS: Max 140-160 characters depending on encoding. Messages longer than limit are split. Total SMS limit: 1,600 characters.

  • Id – Unique identifier for the message within a batch. Required for publish_batch() operation, ignored for single publish() calls.

  • Subject – Optional subject line for email endpoints. Also included in standard JSON messages delivered to other endpoints.

  • MessageStructure

    Set to “json” to send different messages per protocol.

    When set to “json”, the Message parameter must:
    • Be valid JSON

    • Contain at least a “default” key with a string value

    • Optionally contain protocol-specific keys (“http”, “sms”, “email”, etc.)

    Example JSON structure:

    {“default”: “Default message”, “sms”: “Short SMS”, “email”: “Detailed email”}

  • MessageAttributes – Custom message attributes for filtering and routing. Dict of attribute names to attribute values with DataType and StringValue/BinaryValue.

  • MessageDeduplicationId

    FIFO topics only. Deduplication token (up to 128 alphanumeric + punctuation). Messages with same ID within 5 minutes are treated as duplicates.

    If topic has ContentBasedDeduplication, this overrides auto-generated ID.

  • MessageGroupId – FIFO topics only (required). Message group tag (up to 128 alphanumeric + punctuation). Messages in same group are processed in FIFO order. Messages in different groups may process out of order.

as_dict() Dict[str, Any][source]#

Convert the message to a dictionary for SNS API calls. Automatically handles dict messages by converting them to JSON structure with “default” key. Removes None/empty values.

Returns:

Dictionary ready for SNS publish/publish_batch API.

Example

msg = SnsMessage(Message="Hello", Subject="Test", Id="msg-1")
payload = msg.as_dict()
# {"Id": "msg-1", "Message": "Hello", "Subject": "Test"}

msg2 = SnsMessage(Message={"data": 123})
payload2 = msg2.as_dict()
# {
#   "Message": '{"default": "{\"data\": 123}"}',
#   "MessageStructure": "json"
# }
class core_aws.services.sns.client.SnsClient(region: str, batch_size: int = 10, **kwargs: Any)[source]#

Bases: AwsClient

Client for AWS Simple Notification Service (SNS). This client provides methods for publishing messages to SNS topics, endpoints, and phone numbers. Supports both single and batch operations.

Example

# Initialize client
sns = SnsClient(region="us-east-1")

# Publish to topic
msg = SnsMessage(Message="Hello, SNS!", Subject="Notification")
sns.publish_message(msg, topic_arn="arn:aws:sns:us-east-1:123:my-topic")

# Publish multiple messages
messages = [
    SnsMessage(Message="Message 1", Id="msg-1"),
    SnsMessage(Message="Message 2", Id="msg-2")
]
sns.publish_batch("arn:aws:sns:us-east-1:123:my-topic", messages)
client: mypy_boto3_sns.client.SNSClient#
__init__(region: str, batch_size: int = 10, **kwargs: Any) None[source]#

Initialize the SNS client.

Parameters:
  • region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).

  • batch_size – Maximum messages per batch (default: 10, max: 10).

  • kwargs – Additional arguments passed to boto3.client().

batch_size: int#
create_topic(name: str, attributes: Dict[str, str] | None = None, tags: List[Dict[str, str]] | None = None, data_protection_policy: str | None = None) str[source]#

Create a new SNS topic. Creates a standard or FIFO topic based on name suffix (.fifo for FIFO topics). Returns the topic ARN for publishing messages and subscribing endpoints.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/create_topic.html

Parameters:
  • name – Topic name. Must be 1-256 characters, alphanumeric plus hyphens and underscores. For FIFO topics, name must end with “.fifo” suffix.

  • attributes

    Optional topic configuration attributes. Common attributes:

    • DeliveryPolicy: JSON string for delivery retry policy

    • DisplayName: Human-readable name for SMS sender (max 100 chars)

    • FifoTopic: “true” for FIFO topic (auto-set if name ends with .fifo)

    • ContentBasedDeduplication: “true” to enable content-based deduplication (FIFO only)

    • KmsMasterKeyId: AWS KMS key ID for encryption

    • Policy: JSON string for topic access policy

  • tags

    Optional list of tags to apply to the topic. Each tag is a dict with ‘Key’ and ‘Value’ fields. Maximum 50 tags per topic.

    Example: [{“Key”: “Environment”, “Value”: “Production”}]

  • data_protection_policy – Optional JSON string defining data protection policy for sensitive data scanning and redaction.

Returns:

The ARN (Amazon Resource Name) of the created topic.

Raises:

AwsClientException – If topic creation fails.

Example

sns = SnsClient(region="us-east-1")

# Create standard topic
topic_arn = sns.create_topic(name="my-notifications")
print(f"Created topic: {topic_arn}")

# Create FIFO topic with attributes
fifo_arn = sns.create_topic(
    name="my-orders.fifo",
    attributes={
        "FifoTopic": "true",
        "ContentBasedDeduplication": "true"
    },
    tags=[
        {"Key": "Environment", "Value": "Production"},
        {"Key": "Application", "Value": "OrderProcessing"}
    ]
)

# Create topic with KMS encryption
encrypted_arn = sns.create_topic(
    name="secure-topic",
    attributes={
        "KmsMasterKeyId": "alias/aws/sns",
        "DisplayName": "Secure Notifications"
    }
)

Note

  • Topic names are case-sensitive

  • Creating a topic with existing name is idempotent (returns existing ARN)

  • FIFO topics support message ordering and deduplication

  • Standard topics offer best-effort ordering and at-least-once delivery

subscribe_sqs_queue(topic_arn: str, queue_name: str, region: str | None = None, set_queue_policy: bool = True, attributes: Dict[str, str] | None = None) str[source]#

Subscribe an SQS queue to an SNS topic by queue name. This is a convenience method that handles the entire subscription process, including setting up the necessary queue policy to allow SNS to send messages.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/subscribe.html

Parameters:
  • topic_arn – ARN of the SNS topic to subscribe the queue to.

  • queue_name – Name of the SQS queue to subscribe. The queue must exist in the same region as the SNS client or in the specified region.

  • region – Optional AWS region where the SQS queue exists. If not specified, uses the same region as the SNS client.

  • set_queue_policy – If True (default), automatically sets the SQS queue policy to allow the SNS topic to send messages. Set to False if you want to manage the queue policy manually.

  • attributes

    Optional subscription attributes. Common attributes:

    • RawMessageDelivery: “true” to deliver raw messages without SNS envelope (default: “false”)

    • FilterPolicy: JSON string for message filtering based on attributes

    • RedrivePolicy: JSON string defining dead-letter queue for failed deliveries

Returns:

The subscription ARN for the created subscription.

Raises:

AwsClientException – If subscription fails or queue doesn’t exist.

Example

from core_aws.services.sns.client import SnsClient
from core_aws.services.sqs.client import SqsClient

sns = SnsClient(region="us-east-1")
sqs = SqsClient(region="us-east-1")

# Create topic and queue
topic_arn = sns.create_topic(name="notifications")
queue_url = sqs.create_queue(queue_name="notification-queue")

# Subscribe queue to topic (automatically sets queue policy)
subscription_arn = sns.subscribe_sqs_queue(
    topic_arn=topic_arn,
    queue_name="notification-queue"
)

print(f"Subscribed: {subscription_arn}")

# Subscribe with raw message delivery
subscription_arn = sns.subscribe_sqs_queue(
    topic_arn=topic_arn,
    queue_name="notification-queue",
    attributes={
        "RawMessageDelivery": "true"
    }
)

# Subscribe with message filtering
subscription_arn = sns.subscribe_sqs_queue(
    topic_arn=topic_arn,
    queue_name="notification-queue",
    attributes={
        "FilterPolicy": json.dumps({
            "event_type": ["order_placed", "order_shipped"]
        })
    }
)

Note

  • The SQS queue must exist before calling this method

  • By default, sets queue policy to allow SNS to send messages

  • Subscription is confirmed automatically for SQS endpoints

  • For cross-region subscriptions, specify the queue’s region

publish_message(message: SnsMessage, topic_arn: str | None = None, target_arn: str | None = None, phone_number: str | None = None) Dict[str, Any][source]#

Publish a message to an SNS topic, mobile endpoint, or phone number. Sends a message to one of three destinations: an SNS topic (fan-out), a mobile platform endpoint (push notification), or a phone number (SMS). Exactly one destination must be specified.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/publish.html

Parameters:
  • message – SnsMessage object containing the message and metadata.

  • topic_arn – ARN of the SNS topic to publish to. Required if target_arn and phone_number are not specified.

  • target_arn – ARN of mobile platform endpoint or device. Required if topic_arn and phone_number are not specified.

  • phone_number – Phone number in E.164 format (e.g., +14155551234). Required if topic_arn and target_arn are not specified.

Returns:

Dictionary containing:
  • MessageId (str): Unique identifier for the published message.

  • SequenceNumber (str, optional): For FIFO topics only.

Raises:

AwsClientException – If no destination specified or if publishing fails.

Example

sns = SnsClient(region="us-east-1")

# Publish to topic
msg = SnsMessage(Message="Hello!", Subject="Notification")
response = sns.publish_message(
    msg,
    topic_arn="arn:aws:sns:us-east-1:123456789012:my-topic"
)
print(f"Published: {response['MessageId']}")

# Send SMS
sms_msg = SnsMessage(Message="Your code is: 123456")
sns.publish_message(sms_msg, phone_number="+14155551234")

# Publish to mobile endpoint
push_msg = SnsMessage(Message="New message!")
sns.publish_message(
    push_msg,
    target_arn="arn:aws:sns:us-east-1:123:endpoint/APNS/MyApp/abc123"
)
publish_batch(topic_arn: str, messages: List[SnsMessage]) Dict[str, List[Dict[str, Any]]][source]#

Publish multiple messages to an SNS topic in batches. Publishes up to 10 messages per API call. For FIFO topics, messages within a batch are published in order and deduplicated within/across batches for 5 minutes. Automatically handles larger message lists by batching into chunks of batch_size.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/publish_batch.html

Parameters:
  • topic_arn – ARN of the SNS topic to publish to.

  • messages – List of SnsMessage objects to publish. Each message must have a unique Id field within the batch.

Returns:

Dictionary containing aggregated results from all batches:

{
    "Successful": [
        {
            "Id": "string",
            "MessageId": "string",
            "SequenceNumber": "string"  # FIFO only
        }
    ],
    "Failed": [
        {
            "Id": "string",
            "Code": "string",
            "Message": "string",
            "SenderFault": True | False
        }
    ]
}

Raises:

AwsClientException – If batch publishing fails.

Example

sns = SnsClient(region="us-east-1")

# Create messages with unique IDs
messages = [
    SnsMessage(Message="First message", Id="msg-1"),
    SnsMessage(Message="Second message", Id="msg-2"),
    SnsMessage(Message="Third message", Id="msg-3")
]

# Publish batch
result = sns.publish_batch(
    topic_arn="arn:aws:sns:us-east-1:123456789012:my-topic",
    messages=messages
)

print(f"Successful: {len(result['Successful'])}")
print(f"Failed: {len(result['Failed'])}")

# Check failures
for failure in result.get('Failed', []):
    print(f"Failed {failure['Id']}: {failure['Message']}")

Note

  • Each message must have a unique Id field

  • Max 10 messages per batch (enforced by batch_size)

  • For FIFO topics, MessageGroupId is required

  • Total payload size per batch must be < 256 KB

SQS Client#

AWS Simple Queue Service (SQS) client wrapper.

This module provides a high-level interface for interacting with AWS SQS, including message sending, receiving, deletion, and batch operations.

class core_aws.services.sqs.client.SqsClient(region: str, **kwargs: Any)[source]#

Bases: AwsClient

Client for AWS Simple Queue Service (SQS).

This client provides methods for sending, receiving, and deleting messages from SQS queues. It supports both single and batch operations, with automatic retry logic for failed deletions.

Example

# Initialize client
sqs = SqsClient(region="us-east-1")

# Get a queue by name
queue = sqs.get_queue_by_name("my-queue")

# Send a message
sqs.send_message(queue.url, "Hello, SQS!")

# Receive messages
messages = sqs.receive_messages(queue.url)
for msg in messages:
    print(msg["Body"])
client: mypy_boto3_sqs.client.SQSClient#
__init__(region: str, **kwargs: Any) None[source]#

Initialize the SQS client.

Parameters:
  • region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).

  • kwargs – Additional arguments passed to boto3.client().

resource: Any#
create_queue(queue_name: str, **kwargs: Any) str[source]#

Create a new SQS queue with the specified name and attributes. Creates a standard or FIFO queue. Queue names are limited to 80 characters including alphanumeric characters, hyphens (-), and underscores (_). FIFO queue names must end with the .fifo suffix.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/create_queue.html

Parameters:
  • queue_name – Name of the queue to create (max 80 characters).

  • kwargs

    Additional boto3 parameters:

    • Attributes (dict): Queue configuration attributes:

      • DelaySeconds (str): Delivery delay in seconds (0-900). Default: 0.

      • MaximumMessageSize (str): Max message size in bytes (1024-262144). Default: 262144.

      • MessageRetentionPeriod (str): Message retention in seconds (60-1209600). Default: 345600 (4 days).

      • ReceiveMessageWaitTimeSeconds (str): Long-polling wait time (0-20). Default: 0.

      • VisibilityTimeout (str): Visibility timeout in seconds (0-43200). Default: 30.

      • FifoQueue (str): ‘true’ for FIFO queue, ‘false’ for standard. Default: ‘false’.

      • ContentBasedDeduplication (str): ‘true’ to enable content-based deduplication (FIFO only).

      • KmsMasterKeyId (str): ID of AWS KMS key for server-side encryption.

      • KmsDataKeyReusePeriodSeconds (str): KMS key reuse period (60-86400). Default: 300.

      • DeduplicationScope (str): ‘messageGroup’ or ‘queue’ (high-throughput FIFO only).

      • FifoThroughputLimit (str): ‘perQueue’ or ‘perMessageGroupId’ (high-throughput FIFO only).

      • RedrivePolicy (str): JSON string defining dead-letter queue:

        {
            "deadLetterTargetArn": "arn:aws:sqs:region:account:dlq-name",
            "maxReceiveCount": "3"
        }
        
      • RedriveAllowPolicy (str): JSON string defining which source queues can use this as DLQ.

    • tags (dict): Key-value pairs to assign as queue tags.

Returns:

The queue URL string (e.g., “https://sqs.region.amazonaws.com/account/queue-name”).

Raises:

AwsClientException – If queue creation fails.

Example

sqs = SqsClient(region="us-east-1")

# Create a basic standard queue
queue_url = sqs.create_queue(queue_name="my-queue")
print(f"Queue URL: {queue_url}")

# Create a FIFO queue with custom attributes
queue_url = sqs.create_queue(
    queue_name="my-queue.fifo",
    Attributes={
        "FifoQueue": "true",
        "ContentBasedDeduplication": "true",
        "MessageRetentionPeriod": "86400",  # 1 day
        "VisibilityTimeout": "60"
    }
)

# Create a queue with dead-letter queue
queue_url = sqs.create_queue(
    queue_name="my-main-queue",
    Attributes={
        "RedrivePolicy": json.dumps({
            "deadLetterTargetArn": "arn:aws:sqs:us-east-1:123456789012:my-dlq",
            "maxReceiveCount": "5"
        }),
        "VisibilityTimeout": "30"
    },
    tags={
        "Environment": "production",
        "Application": "my-app"
    }
)
get_queue_by_name(queue_name: str, **kwargs: Any) mypy_boto3_sqs.service_resource.Queue[source]#

Retrieve an existing Amazon SQS queue by name. Returns a Queue resource object that can be used to interact with the queue. To access a queue owned by another AWS account, use the QueueOwnerAWSAccountId parameter.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/service-resource/get_queue_by_name.html

Parameters:
  • queue_name – The name of the queue.

  • kwargs

    Additional boto3 parameters:

    • QueueOwnerAWSAccountId (str):

      AWS account ID of the account that created the queue. Required for accessing queues in other accounts.

Returns:

sqs.Queue resource object with the following attributes:

  • url (str): Queue URL (https://sqs.<region>.amazonaws.com/<account>/QueueName)

  • dead_letter_source_queues: Dead letter queue information

  • meta: Queue metadata

  • attributes (dict): Queue attributes dictionary:

{
    'QueueArn': 'arn:aws:sqs:...',
    'ApproximateNumberOfMessages': '0',
    'ApproximateNumberOfMessagesNotVisible': '0',
    'ApproximateNumberOfMessagesDelayed': '0',
    'CreatedTimestamp': '1699539978',
    'LastModifiedTimestamp': '1699540164',
    'VisibilityTimeout': '300',
    'MaximumMessageSize': '262144',
    'MessageRetentionPeriod': '3600',
    'DelaySeconds': '60',
    ...
}

Raises:

AwsClientException – If the queue cannot be retrieved.

Example

sqs = SqsClient(region="us-east-1")

# Get a queue in same account
queue = sqs.get_queue_by_name("my-queue")
print(f"Queue URL: {queue.url}")

# Get a queue in another account
queue = sqs.get_queue_by_name(
    "cross-account-queue",
    QueueOwnerAWSAccountId="123456789012"
)
send_message(queue_url: str, message: str, **kwargs: Any) Dict[str, Any][source]#

Send a single message to an SQS queue. Delivers a message to the specified queue. Messages can contain XML, JSON, or unformatted text. Allowed Unicode characters: #x9, #xA, #xD, #x20 to #xD7FF, #xE000 to #xFFFD, #x10000 to #x10FFFF.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Client.send_message

Parameters:
  • queue_url – URL of the SQS queue.

  • message – Message body (up to 256 KB).

  • kwargs

    Additional boto3 parameters:

    • DelaySeconds (int): Delay before message becomes available (0-900 seconds).

    • MessageAttributes (dict): Custom attributes for the message.

    • MessageDeduplicationId (str): Deduplication ID for FIFO queues.

    • MessageGroupId (str): Message group ID for FIFO queues (required).

Returns:

Dictionary containing MessageId, MD5OfMessageBody, and SequenceNumber.

Raises:

AwsClientException – If message sending fails.

Example

sqs = SqsClient(region="us-east-1")

# Send a simple message
response = sqs.send_message(
    queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue",
    message="Hello, SQS!"
)
print(f"Message ID: {response['MessageId']}")

# Send with delay and attributes
sqs.send_message(
    queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue",
    message='{"event": "user_signup", "user_id": 123}',
    DelaySeconds=30,
    MessageAttributes={
        "EventType": {
            "StringValue": "UserSignup",
            "DataType": "String"
        }
    }
)
send_message_batch(queue_url: str, entries: List[Dict[str, Any]], **kwargs: Any) Dict[str, Any][source]#

Send multiple messages to an SQS queue in a single request. Delivers up to 10 messages to the specified queue in a single batch operation. For FIFO queues, messages within a batch are enqueued in the order they are sent.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/send_message_batch.html

Parameters:
  • queue_url – URL of the SQS queue.

  • entries

    List of message entries (max 10). Each entry must contain:

    • Id (str): Unique identifier for the message (within batch).

    • MessageBody (str): Message content (up to 256 KB).

    • DelaySeconds (int, optional): Message delay (0-900 seconds).

    • MessageAttributes (dict, optional): Custom message attributes.

    • MessageDeduplicationId (str, optional): For FIFO queues.

    • MessageGroupId (str, optional): For FIFO queues.

  • kwargs – Additional boto3 parameters.

Returns:

Dictionary containing Successful and Failed lists:

{
    "Successful": [
        {
            "Id": "string",
            "MessageId": "string",
            "MD5OfMessageBody": "string",
            "SequenceNumber": "string"
        }
    ],
    "Failed": [
        {
            "Id": "string",
            "SenderFault": True|False,
            "Code": "string",
            "Message": "string"
        }
    ]
}

Raises:

AwsClientException – If batch sending fails.

Example

sqs = SqsClient(region="us-east-1")

# Send multiple messages
response = sqs.send_message_batch(
    queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue",
    entries=[
        {
            "Id": "msg1",
            "MessageBody": "First message"
        },
        {
            "Id": "msg2",
            "MessageBody": "Second message",
            "DelaySeconds": 10
        },
        {
            "Id": "msg3",
            "MessageBody": "Third message"
        }
    ]
)
print(f"Successful: {len(response['Successful'])}")
print(f"Failed: {len(response['Failed'])}")
receive_messages(queue_url: str, max_number_of_msg: int = 10, **kwargs: Any) List[Dict[str, Any]][source]#

Receive one or more messages from an SQS queue. Retrieves up to 10 messages from the specified queue. Use the WaitTimeSeconds parameter to enable long-polling (recommended for reducing empty receives and API costs).

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/receive_message.html

Parameters:
  • queue_url – URL of the SQS queue.

  • max_number_of_msg – Maximum number of messages to retrieve (1-10). Default: 10.

  • kwargs

    Additional boto3 parameters:

    • AttributeNames (list): System attributes to retrieve (e.g., [‘All’, ‘ApproximateReceiveCount’]).

    • MessageAttributeNames (list): Custom message attributes to retrieve (e.g., [‘All’]).

    • VisibilityTimeout (int): Duration message is hidden after retrieval (0-43200 seconds).

    • WaitTimeSeconds (int): Long-polling wait time (0-20 seconds).

Returns:

List of message dictionaries:

[
    {
        "MessageId": "string",
        "ReceiptHandle": "string",
        "MD5OfBody": "string",
        "Body": "string",
        "Attributes": {
            "ApproximateReceiveCount": "1",
            "SentTimestamp": "1699539978000"
        },
        "MD5OfMessageAttributes": "string",
        "MessageAttributes": {
            "AttributeName": {
                "StringValue": "string",
                "BinaryValue": b"bytes",
                "DataType": "String"
            }
        }
    }
]

Raises:

AwsClientException – If message retrieval fails.

Example

sqs = SqsClient(region="us-east-1")

# Basic receive
messages = sqs.receive_messages(
    queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue"
)
for msg in messages:
    print(f"Message: {msg['Body']}")

# Long-polling with attributes
messages = sqs.receive_messages(
    queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue",
    max_number_of_msg=5,
    WaitTimeSeconds=20,  # Long-poll for 20 seconds
    MessageAttributeNames=['All'],
    AttributeNames=['All']
)
retrieve_all_messages(queue_url: str, **kwargs: Any) Iterator[Dict[str, Any]][source]#

Retrieve all messages from a queue using an iterator. Continuously polls the queue and yields messages until the queue is empty. Useful for processing all messages in a queue.

Parameters:
  • queue_url – URL of the SQS queue.

  • kwargs – Additional parameters passed to receive_messages().

Returns:

Iterator yielding message dictionaries.

Example

sqs = SqsClient(region="us-east-1")

# Process all messages in queue
for message in sqs.retrieve_all_messages(
    queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue",
    WaitTimeSeconds=5
):
    print(f"Processing: {message['Body']}")
    # Process the message...
    sqs.delete_message(queue_url, message['ReceiptHandle'])

Warning

This method will continue until the queue is empty. Make sure to delete messages after processing to avoid infinite loops.

delete_message(queue_url: str, receipt_handle: str, **kwargs: Any) Dict[str, Any][source]#

Delete a single message from an SQS queue. Deletes the specified message using its ReceiptHandle (not MessageId). Messages are automatically deleted after the retention period expires. Amazon SQS can delete a message even if it’s locked by visibility timeout.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/delete_message.html

Parameters:
  • queue_url – URL of the SQS queue.

  • receipt_handle – Receipt handle of the message (from receive_message).

  • kwargs – Additional boto3 parameters.

Returns:

Empty dictionary on success.

Raises:

AwsClientException – If message deletion fails.

Example

sqs = SqsClient(region="us-east-1")

# Receive and delete a message
messages = sqs.receive_messages(
    queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue"
)
for msg in messages:
    # Process the message...
    print(msg['Body'])

    # Delete after processing
    sqs.delete_message(
        queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue",
        receipt_handle=msg['ReceiptHandle']
    )
delete_message_batch(queue_url: str, entries: List[Dict[str, str]], **kwargs: Any) Dict[str, Any][source]#

Delete multiple messages from an SQS queue in a single request. Deletes up to 10 messages in a single batch operation. Each entry must include the message Id and ReceiptHandle.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/delete_message_batch.html

Parameters:
  • queue_url – URL of the SQS queue.

  • entries

    List of message entries to delete (max 10). Each entry must contain:

    • Id (str): Unique identifier for this deletion (within batch).

    • ReceiptHandle (str): Receipt handle from receive_message.

  • kwargs – Additional boto3 parameters.

Returns:

Dictionary containing Successful and Failed lists:

{
    "Successful": [
        {
            "Id": "string"
        }
    ],
    "Failed": [
        {
            "Id": "string",
            "SenderFault": True|False,
            "Code": "string",
            "Message": "string"
        }
    ]
}

Raises:

AwsClientException – If batch deletion fails.

Example

sqs = SqsClient(region="us-east-1")

# Receive messages
messages = sqs.receive_messages(
    queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue"
)

# Delete in batch
delete_entries = [
    {
        "Id": msg['MessageId'],
        "ReceiptHandle": msg['ReceiptHandle']
    }
    for msg in messages
]

result = sqs.delete_message_batch(
    queue_url="https://sqs.us-east-1.amazonaws.com/123/my-queue",
    entries=delete_entries
)
print(f"Deleted: {len(result['Successful'])}")
print(f"Failed: {len(result['Failed'])}")
delete_messages(queue_url: str, entries: List[Dict[str, str]], retries: int = 3, **kwargs: Any) Dict[str, Any][source]#

It’s a wrapper over “delete_message_batch” and will delete all messages, if a message deletion fails it will be re-tried until success or until the maximum attempts are exhausted…

Parameters:
  • queue_url – The SQS queue url.

  • entries – The messages reference to delete.

  • retries – Number of re-tries in case of errors while deleting the messages.

  • kwargs – Other arguments to pass to delete_message_batch method.

Returns:

{
    "Successful": [{
        "Id": "string"
    }],
    "Failed": [{
        "Id": "string",
        "SenderFault": True|False,
        "Code": "string",
        "Message": "string"
    }]
}

SSM Client#

AWS Systems Manager (SSM) Parameter Store client wrapper.

This module provides a high-level interface for interacting with AWS Systems Manager Parameter Store, including support for retrieving parameters, secrets, and managing parameter hierarchies.

class core_aws.services.ssm.client.SsmClient(region: str, **kwargs: Any)[source]#

Bases: AwsClient

Client for AWS Systems Manager (SSM) Parameter Store.

This client provides methods for retrieving, storing, and managing parameters in AWS Systems Manager Parameter Store. It supports:

  • Individual parameter retrieval with decryption

  • Bulk parameter retrieval by path hierarchy

  • Integration with AWS Secrets Manager

  • Parameter creation and updates

  • Object attribute population from SSM parameters

Example

# Initialize client
ssm = SsmClient(region="us-east-1")

# Get a single parameter
param = ssm.get_parameter("/myapp/database/host")
print(param["Value"])

# Get all parameters under a path
for param in ssm.get_parameters_by_path("/myapp/"):
    print(f"{param['Name']}: {param['Value']}")

# Retrieve a secret from Secrets Manager
secret = ssm.get_secret("my-database-password")
client: mypy_boto3_ssm.client.SSMClient#
__init__(region: str, **kwargs: Any) None[source]#

Initialize the SSM client.

Parameters:
  • region – AWS region name (e.g., ‘us-east-1’, ‘eu-west-1’).

  • kwargs – Additional arguments passed to boto3.client().

get_secret(secret_id: str) str[source]#

Retrieve a secret value from AWS Secrets Manager via SSM Parameter Store. This method uses SSM’s special reference format to access secrets stored in AWS Secrets Manager: /aws/reference/secretsmanager/{secret_id}.

Parameters:

secret_id – The ID or name of the secret in Secrets Manager.

Returns:

The decrypted secret value as a string.

Raises:

AwsClientException – If the secret cannot be retrieved.

Example

ssm = SsmClient(region="us-east-1")
db_password = ssm.get_secret("prod/database/password")
print(f"Password: {db_password}")
get_parameter(parameter_name: str, with_decryption: bool = True) SSMParameter[source]#

Retrieve a parameter from SSM Parameter Store. Retrieves information about a single parameter including its value, type, version, and metadata. Supports automatic decryption of SecureString parameters.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm/client/get_parameter.html

Parameters:
  • parameter_name – The fully qualified name of the parameter you want to query. Must include the complete hierarchy path (e.g., ‘/myapp/database/host’).

  • with_decryption – Return decrypted values for SecureString parameters. This flag is ignored for String and StringList parameter types. Default: True.

Returns:

SSMParameter dictionary containing parameter information.

Raises:

AwsClientException – If the parameter cannot be retrieved.

Example

ssm = SsmClient(region="us-east-1")

# Get a standard parameter
param = ssm.get_parameter("/myapp/config/api_url")
print(param["Value"])  # "https://api.example.com"

# Get a SecureString parameter (auto-decrypted)
secret_param = ssm.get_parameter("/myapp/secrets/api_key")
print(secret_param["Type"])  # "SecureString"

# Get without decryption
encrypted = ssm.get_parameter(
    "/myapp/secrets/api_key",
    with_decryption=False
)
Return Structure:
{
    "Name": "string",
    "Type": "String"|"StringList"|"SecureString",
    "Value": "string",
    "Version": 123,
    "Selector": "string",
    "SourceResult": "string",
    "LastModifiedDate": datetime(2015, 1, 1),
    "ARN": "string",
    "DataType": "string"
}
get_parameters_by_path(path: str, with_decryption: bool = True, **kwargs: Any) Iterator[SSMParameter][source]#

Retrieve all parameters under a specific path hierarchy. Recursively retrieves parameters from a path in SSM Parameter Store, automatically handling pagination. This is useful for fetching multiple related parameters at once (e.g., all database configuration parameters under /myapp/database/).

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm/client/get_parameters_by_path.html

Parameters:
  • path

    The parameter path hierarchy. Hierarchies start with a forward slash (/) and can have up to 15 levels. Examples:

    • /myapp/ - gets all parameters under myapp

    • /myapp/database/ - gets all database parameters

    • /prod/api/config/ - gets all production API configs

  • with_decryption – Retrieve all parameters with their values decrypted (for SecureString types). Default: True.

  • kwargs

    Additional boto3 parameters:

    • Recursive (bool):

      Retrieve all parameters within the hierarchy, not just immediate children. Default: False.

    • ParameterFilters (list):
      Filters to limit results. List of dicts with keys:
      • Key (string): Filter key (e.g., ‘Type’, ‘Name’)

      • Option (string): Filter option (e.g., ‘Equals’, ‘BeginsWith’)

      • Values (list): Values to match

    • MaxResults (int):

      Maximum number of items per API call (1-10). Pagination continues automatically regardless of this value.

Returns:

Iterator yielding SSMParameter dictionaries. Automatically handles pagination across multiple API calls.

Raises:

AwsClientException – If parameters cannot be retrieved.

Example

ssm = SsmClient(region="us-east-1")

# Get all parameters under a path
for param in ssm.get_parameters_by_path("/myapp/database/"):
    print(f"{param['Name']}: {param['Value']}")

# Get all parameters recursively
for param in ssm.get_parameters_by_path(
    "/myapp/",
    Recursive=True
):
    print(f"{param['Name']}: {param['Value']}")

# Filter by type
for param in ssm.get_parameters_by_path(
    "/myapp/",
    ParameterFilters=[
        {
            "Key": "Type",
            "Option": "Equals",
            "Values": ["SecureString"]
        }
    ]
):
    print(f"Secret: {param['Name']}")
Return Structure:

Each yielded item has the structure:

{
    "Name": "string",
    "Type": "String"|"StringList"|"SecureString",
    "Value": "string",
    "Version": 123,
    "Selector": "string",
    "SourceResult": "string",
    "LastModifiedDate": datetime(2015, 1, 1),
    "ARN": "string",
    "DataType": "string"
}
put_parameter(name: str, value: str, overwrite: bool = False, **kwargs: Any) Dict[str, Any][source]#

Create or update a parameter in SSM Parameter Store. Adds a new parameter or updates an existing one in Parameter Store. Supports standard, advanced, and intelligent-tiering parameters with optional encryption for SecureString types.

Reference:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ssm.html#SSM.Client.put_parameter

Parameters:
  • name – The fully qualified name of the parameter. Must include the complete hierarchy path with leading forward slash (/). Example: /Dev/DBServer/MySQL/db-string13

  • value – The parameter value. Standard parameters support up to 4 KB, advanced parameters support up to 8 KB.

  • overwrite – Overwrite an existing parameter. If False and parameter exists, raises an error. Default: False.

  • kwargs

    Additional boto3 parameters:

    • Description (str):

      Information about the parameter. Optional but recommended.

    • Type (str):

      Parameter type: ‘String’, ‘StringList’, or ‘SecureString’. Default: ‘String’.

    • KeyId (str):

      KMS key ID for encrypting SecureString parameters. Uses default AWS account key if not specified. Required for SecureString type.

    • AllowedPattern (str):

      Regex pattern to validate parameter value. Example: ^d+$ for numbers only.

    • Tags (list):

      Resource tags. List of dicts with ‘Key’ and ‘Value’.

    • Tier (str):

      ’Standard’, ‘Advanced’, or ‘Intelligent-Tiering’. Default: ‘Standard’.

    • DataType (str):

      Data type hint (e.g., ‘text’, ‘aws:ec2:image’).

Returns:

Dictionary containing version and tier information.

Raises:

AwsClientException – If parameter creation/update fails.

Example

ssm = SsmClient(region="us-east-1")

# Create a simple parameter
result = ssm.put_parameter(
    name="/myapp/config/api_url",
    value="https://api.example.com"
)
print(f"Version: {result['Version']}")

# Create a SecureString parameter
ssm.put_parameter(
    name="/myapp/secrets/api_key",
    value="secret-key-12345",
    Type="SecureString",
    Description="API key for external service"
)

# Update an existing parameter
ssm.put_parameter(
    name="/myapp/config/api_url",
    value="https://new-api.example.com",
    overwrite=True
)

# Create with tags
ssm.put_parameter(
    name="/myapp/config/version",
    value="1.0.0",
    Tags=[
        {"Key": "Environment", "Value": "Production"},
        {"Key": "Application", "Value": "MyApp"}
    ]
)
Return Structure:
{
    "Version": 123,
    "Tier": "Standard" | "Advanced" | "Intelligent-Tiering"
}
retrieve_parameters_from_ssm(ssm_path: str, parameters: List[str]) Dict[str, str][source]#

Retrieve specific parameters from SSM by matching suffixes. Retrieves all parameters under a path and returns a dictionary mapping parameter suffixes to their values. This is useful when you know the parameter suffixes but not the full paths.

Parameters:
  • ssm_path – SSM path to search for parameters (e.g., ‘/myapp/database/’).

  • parameters – List of parameter name suffixes to extract. Each suffix will be matched against the end of parameter names. Example: [‘host’, ‘port’, ‘username’]

Returns:

Dictionary mapping parameter suffixes to their values. Returns empty string for suffixes not found.

Example

ssm = SsmClient(region="us-east-1")

# Retrieve specific database parameters
db_params = ssm.retrieve_parameters_from_ssm(
    ssm_path="/myapp/database/",
    parameters=["host", "port", "username"]
)
# Result: {
#   "host": "db.example.com",
#   "port": "5432",
#   "username": "admin"
# }

print(f"Database: {db_params['host']}:{db_params['port']}")

Warning

This method has O(n*m) complexity where n=number of parameters in SSM path and m=number of suffixes. For large parameter sets, consider using get_parameters_by_path() directly and filtering results manually for better performance.

update_obj_attrs(obj: object, ssm_path: str) None[source]#

Update object attributes with values from SSM Parameter Store. Scans object attributes and replaces their values with matching SSM parameter values. The current attribute value is treated as the SSM parameter name to look up.

Parameters:
  • obj – Object whose attributes will be updated. Must have public (non-underscore) attributes.

  • ssm_path – SSM path to retrieve parameters from (e.g., ‘/myapp/config/’).

Example

class DatabaseConfig:
    def __init__(self):
        self.host = "/myapp/database/host"
        self.port = "/myapp/database/port"
        self.username = "/myapp/database/username"

ssm = SsmClient(region="us-east-1")
config = DatabaseConfig()

# Before: config.host = "/myapp/database/host"
ssm.update_obj_attrs(config, "/myapp/database/")
# After: config.host = "db.example.com"

print(config.host)  # "db.example.com"
print(config.port)  # "5432"

Warning

This method has O(n*m) complexity where n=number of SSM parameters and m=number of object attributes. Use with caution on large objects or parameter sets.

Note

  • Only public attributes (not starting with ‘_’) are updated

  • Methods and callable attributes are skipped

  • Attribute value must exactly match the SSM parameter name