ETLs#

Base Interface#

class core_aws.etls.base.IBaseEtlOnAWS(aws_region: str, ssm_parameters_path: str | None = None, attrs_to_update: List[str] | None = None, json_attrs: List[str] | None = None, ssm_endpoint_url: str | None = None, strict_ssm_validation: bool = False, **kwargs: Any)[source]#

Bases: IBaseETL, ABC

Base class for ETL tasks executed on AWS. It provides common features that can be used into the ETL processes running on AWS services.

Provides common features that can be used in ETL processes, including:
  • Automatic parameter loading from AWS Systems Manager Parameter Store.

  • JSON attribute parsing.

  • Pre-configured AWS service clients (SSM, SQS, S3).

  • Validation of required attributes.

  • Configurable strict/lenient validation modes.

Example

class MyETL(IBaseEtlOnAWS):
    def __init__(self):
        self.database_url = "/myapp/prod/database"
        self.api_config = '{"timeout": 30}'

        super().__init__(
            aws_region="us-east-1",
            ssm_parameters_path="/myapp/prod",
            attrs_to_update=["database_url"],
            json_attrs=["api_config"],
            strict_ssm_validation=True
        )

    def extract(self):
        # database_url now contains the actual value from SSM
        # api_config is now a dict: {"timeout": 30}
        pass
__init__(aws_region: str, ssm_parameters_path: str | None = None, attrs_to_update: List[str] | None = None, json_attrs: List[str] | None = None, ssm_endpoint_url: str | None = None, strict_ssm_validation: bool = False, **kwargs: Any) None[source]#

Initialize the AWS ETL base class.

Parameters:
  • aws_region – AWS Region (e.g., “us-east-1”, “eu-west-1”).

  • ssm_parameters_path – Path where parameters can be found in SSM Parameter Store.

  • attrs_to_update – List of object attributes to update from SSM parameters. Attribute values should contain SSM parameter paths.

  • json_attrs – List of attributes that should be parsed as JSON (dicts, lists).

  • ssm_endpoint_url – Custom endpoint URL for SSM service (useful for testing/LocalStack).

  • strict_ssm_validation – If True, raises exceptions when expected SSM parameters are missing. If False (default), logs warnings instead.

  • kwargs – Additional arguments passed to the parent IBaseETL class.

Raises:
  • AttributeError – If specified attributes don’t exist on the object during pre_processing.

  • AwsClientException – If strict_ssm_validation=True and SSM parameters are missing.

pre_processing() None[source]#

Pre-processing hook that validates attributes, updates them from SSM, and parses JSON.

This method is called before ETL execution and performs the following:
  1. Validates that attrs_to_update and json_attrs exist on the object

  2. Retrieves parameters from SSM Parameter Store

  3. Updates object attributes with SSM values

  4. Parses JSON string attributes into Python objects

Parameters:

kwargs – Additional arguments passed to the parent pre_processing method.

Raises:
Warns:

If attrs_to_update don’t exist or if JSON parsing fails.

_update_parameters(attrs: List[str]) None[source]#

Retrieve parameters from SSM Parameter Store and update object attributes. Fetches all parameters under ssm_parameters_path and updates object attributes where the attribute value matches a parameter name in SSM.

Parameters:

attrs – List of attribute names to update from SSM.

Raises:

AwsClientException

If strict_ssm_validation=True and:
  • ssm_parameters_path is not configured

  • No parameters found at the specified path

  • Required parameters are missing

Warns:

If strict_ssm_validation=False and issues are detected.

_update_attributes(attributes: List[str], parameters: List[SSMParameter]) None[source]#

Update object attributes using values from SSM Parameter Store. Matches object attribute values (which contain SSM parameter paths) against the parameter names retrieved from SSM, and replaces the attribute values with the actual parameter values.

Parameters:
  • attributes – List of attribute names to update.

  • parameters – List of SSM parameter dictionaries containing Name and Value.

Raises:

AwsClientException – If strict_ssm_validation=True and parameters are missing.

Warns:

If strict_ssm_validation=False and parameters are missing.

Example

Given an SSM parameter:

[{
    "Name": "/path/service/user",
    "Value": "user_name"
}]

If the object has a “user” attribute with value “/path/service/user”, the attribute will be updated to “user_name”.

_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#

Bucket Based Interface#

class core_aws.etls.bucket_based.IBaseEtlOnAwsBucket(bucket: str | None = None, prefix: str | None = None, archive_bucket: str | None = None, archive_prefix: str | None = None, error_bucket: str | None = None, error_prefix: str | None = None, **kwargs: Any)[source]#

Bases: IBaseEtlFromFile, IBaseEtlOnAWS, ABC

Base class for ETL processes that retrieve and process files from AWS S3.

This class provides a framework for ETL processes that: - Download files from an S3 bucket - Process files locally - Archive successfully processed files to an archive bucket - Move failed files to an error bucket - Clean up local and source files after processing

The workflow for each file: 1. Download from source bucket to local temp folder 2. Process the file locally (via process_local_file) 3. On success: copy to archive_bucket/archive_prefix 4. On error: copy to error_bucket/error_prefix 5. Delete from source bucket 6. Delete local temp file

Example

class MyS3ETL(IBaseEtlOnAwsBucket):
    def process_local_file(self, local_path: str):
        # Process the downloaded file
        with open(local_path, 'r') as f:
            data = f.read()
            # ... process data ...

etl = MyS3ETL(
    bucket="my-source-bucket",
    prefix="incoming/",
    archive_bucket="my-archive-bucket",
    archive_prefix="processed/",
    error_bucket="my-error-bucket",
    error_prefix="failed/"
)
etl.execute()
__init__(bucket: str | None = None, prefix: str | None = None, archive_bucket: str | None = None, archive_prefix: str | None = None, error_bucket: str | None = None, error_prefix: str | None = None, **kwargs: Any) None[source]#

Initialize the S3-based ETL process.

Parameters:
  • bucket – Source S3 bucket containing files to process.

  • prefix – Path/prefix within the source bucket to retrieve files from.

  • archive_bucket – S3 bucket for archiving successfully processed files (optional).

  • archive_prefix – Path/prefix to use when archiving successful files (optional).

  • error_bucket – S3 bucket for archiving files that failed processing (optional).

  • error_prefix – Path/prefix to use when archiving failed files (optional).

  • kwargs – Additional arguments passed to parent classes.

Note

  • Files are always deleted from the source bucket after processing.

  • If archive_bucket/error_bucket are not provided, files won’t be archived.

  • Source files are deleted regardless of archiving success/failure.

_execute(*args: Any, **kwargs: Any) int[source]#

Execute the ETL process for all files in the S3 bucket.

Parameters:
  • args – Positional arguments passed to parent _execute.

  • kwargs – Keyword arguments passed to parent _execute.

Returns:

Number of files processed.

get_paths() Iterator[str][source]#

Retrieve S3 object keys (file paths) from the source bucket. Yields object keys from the configured bucket and prefix that will be downloaded and processed.

Parameters:
  • args – Positional arguments (unused).

  • kwargs – Keyword arguments (unused).

Returns:

Iterator of S3 object keys (file paths).

process_file(path: str) None[source]#

Download, process, archive, and clean up a single file from S3.

This method orchestrates the complete workflow for a single file: 1. Downloads the file from S3 to local temp folder 2. Calls process_local_file() to process it 3. Archives to success/error bucket based on processing result 4. Deletes the file from the source S3 bucket 5. Deletes the local temp file

Parameters:
  • path – S3 object key (file path) to process.

  • args – Positional arguments (unused).

  • kwargs – Keyword arguments (unused).

abstractmethod process_local_file(local_path: str, *args: Any, **kwargs: Any) None[source]#

Process a downloaded file from the local filesystem. This abstract method must be implemented by subclasses to define the actual processing logic for each file.

Parameters:
  • local_path – Absolute path to the downloaded file in the temp folder.

  • args – Additional positional arguments.

  • kwargs – Additional keyword arguments.

Raises:

Exception – Any exception raised will cause the file to be archived to the error bucket instead of the archive bucket.

Example

def process_local_file(self, local_path: str):
    with open(local_path, 'r') as f:
        data = json.load(f)
        # Process data...
        self.database.insert(data)
_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#

SQS Based Interface#

class core_aws.etls.sqs_based.IBaseEtlOnAwsSQS(queue_name: str, **kwargs: Any)[source]#

Bases: IBaseEtlOnAWS, ABC

Base class for ETL processes that retrieve and process messages from AWS SQS.

This class provides a framework for ETL processes that:
  • Poll messages from an SQS queue in batches.

  • Process each message individually.

  • Delete successfully processed messages from the queue.

  • Continue processing until the queue is empty.

  • Handle message processing errors gracefully.

The workflow:
  1. Retrieve a batch of messages from SQS.

  2. Process each message via process_message().

  3. Track successfully processed messages.

  4. Delete successful messages from the queue.

  5. Repeat until queue is empty.

Failed messages remain in the queue and will be retried based on the queue’s visibility timeout and retry policy.

Example

class MyDataProcessor(IBaseEtlOnAwsSQS):
    def process_message(self, message: Dict) -> None:
        body = json.loads(message["Body"])
        # Process the message data
        self.database.insert(body["data"])

processor = MyDataProcessor(
    queue_name="my-data-queue",
    aws_region="us-east-1"
)
processor.execute()
__init__(queue_name: str, **kwargs: Any) None[source]#

Initialize the SQS-based ETL process.

Parameters:
  • queue_name – Name of the SQS queue to process messages from.

  • kwargs – Additional arguments passed to parent IBaseEtlOnAWS class.

pre_processing() None[source]#

Pre-processing hook to retrieve the SQS queue reference. Fetches the queue object from SQS using the configured queue name. Must be called before _execute().

Parameters:

kwargs – Additional arguments passed to parent pre_processing.

Raises:

Exception – If queue cannot be found or accessed.

_execute(*args: Any, **kwargs: Any) int[source]#

Retrieve and process messages from the SQS queue in batches. Continuously polls the queue for messages, processes each one, and deletes successfully processed messages. Continues until the queue is empty.

Parameters:
  • args – Positional arguments (unused).

  • kwargs – Additional arguments (e.g., MaxNumberOfMessages, WaitTimeSeconds).

Returns:

Total number of successfully processed messages.

abstractmethod process_message(message: Dict[str, Any]) None[source]#

Process a single SQS message. This abstract method must be implemented by subclasses to define the actual message processing logic.

Parameters:

message

The SQS message dictionary containing:
  • MessageId: Unique message identifier.

  • ReceiptHandle: Token for deleting the message.

  • Body: The message content (often JSON string).

  • Attributes: Message attributes.

  • MessageAttributes: Custom message attributes.

Raises:

Exception – Any exception raised will cause the message to remain in the queue for retry according to the queue’s visibility timeout and redrive policy.

Example

def process_message(self, message: Dict[str, Any]) -> None:
    # Parse message body
    body = json.loads(message["Body"])

    # Extract and validate data
    data = body["data"]
    validate(data)

    # Process the data
    self.database.insert(data)
    self.info(f"Processed message: {message['MessageId']}")
_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#