ETLs#
Base Interface#
- class core_aws.etls.base.IBaseEtlOnAWS(aws_region: str, ssm_parameters_path: str | None = None, attrs_to_update: List[str] | None = None, json_attrs: List[str] | None = None, ssm_endpoint_url: str | None = None, strict_ssm_validation: bool = False, **kwargs: Any)[source]#
Bases:
IBaseETL,ABCBase class for ETL tasks executed on AWS. It provides common features that can be used into the ETL processes running on AWS services.
- Provides common features that can be used in ETL processes, including:
Automatic parameter loading from AWS Systems Manager Parameter Store.
JSON attribute parsing.
Pre-configured AWS service clients (SSM, SQS, S3).
Validation of required attributes.
Configurable strict/lenient validation modes.
Example
class MyETL(IBaseEtlOnAWS): def __init__(self): self.database_url = "/myapp/prod/database" self.api_config = '{"timeout": 30}' super().__init__( aws_region="us-east-1", ssm_parameters_path="/myapp/prod", attrs_to_update=["database_url"], json_attrs=["api_config"], strict_ssm_validation=True ) def extract(self): # database_url now contains the actual value from SSM # api_config is now a dict: {"timeout": 30} pass
- __init__(aws_region: str, ssm_parameters_path: str | None = None, attrs_to_update: List[str] | None = None, json_attrs: List[str] | None = None, ssm_endpoint_url: str | None = None, strict_ssm_validation: bool = False, **kwargs: Any) None[source]#
Initialize the AWS ETL base class.
- Parameters:
aws_region – AWS Region (e.g., “us-east-1”, “eu-west-1”).
ssm_parameters_path – Path where parameters can be found in SSM Parameter Store.
attrs_to_update – List of object attributes to update from SSM parameters. Attribute values should contain SSM parameter paths.
json_attrs – List of attributes that should be parsed as JSON (dicts, lists).
ssm_endpoint_url – Custom endpoint URL for SSM service (useful for testing/LocalStack).
strict_ssm_validation – If True, raises exceptions when expected SSM parameters are missing. If False (default), logs warnings instead.
kwargs – Additional arguments passed to the parent IBaseETL class.
- Raises:
AttributeError – If specified attributes don’t exist on the object during pre_processing.
AwsClientException – If strict_ssm_validation=True and SSM parameters are missing.
- pre_processing() None[source]#
Pre-processing hook that validates attributes, updates them from SSM, and parses JSON.
- This method is called before ETL execution and performs the following:
Validates that attrs_to_update and json_attrs exist on the object
Retrieves parameters from SSM Parameter Store
Updates object attributes with SSM values
Parses JSON string attributes into Python objects
- Parameters:
kwargs – Additional arguments passed to the parent pre_processing method.
- Raises:
AttributeError – If json_attrs don’t exist on the object.
AwsClientException – If strict_ssm_validation=True and parameters are missing/not found.
- Warns:
If attrs_to_update don’t exist or if JSON parsing fails.
- _update_parameters(attrs: List[str]) None[source]#
Retrieve parameters from SSM Parameter Store and update object attributes. Fetches all parameters under
ssm_parameters_pathand updates object attributes where the attribute value matches a parameter name in SSM.- Parameters:
attrs – List of attribute names to update from SSM.
- Raises:
- If strict_ssm_validation=True and:
ssm_parameters_path is not configured
No parameters found at the specified path
Required parameters are missing
- Warns:
If strict_ssm_validation=False and issues are detected.
- _update_attributes(attributes: List[str], parameters: List[SSMParameter]) None[source]#
Update object attributes using values from SSM Parameter Store. Matches object attribute values (which contain SSM parameter paths) against the parameter names retrieved from SSM, and replaces the attribute values with the actual parameter values.
- Parameters:
attributes – List of attribute names to update.
parameters – List of SSM parameter dictionaries containing Name and Value.
- Raises:
AwsClientException – If strict_ssm_validation=True and parameters are missing.
- Warns:
If strict_ssm_validation=False and parameters are missing.
Example
Given an SSM parameter:
[{ "Name": "/path/service/user", "Value": "user_name" }]
If the object has a “user” attribute with value “/path/service/user”, the attribute will be updated to “user_name”.
- _abc_impl = <_abc._abc_data object>#
Bucket Based Interface#
- class core_aws.etls.bucket_based.IBaseEtlOnAwsBucket(bucket: str | None = None, prefix: str | None = None, archive_bucket: str | None = None, archive_prefix: str | None = None, error_bucket: str | None = None, error_prefix: str | None = None, **kwargs: Any)[source]#
Bases:
IBaseEtlFromFile,IBaseEtlOnAWS,ABCBase class for ETL processes that retrieve and process files from AWS S3.
This class provides a framework for ETL processes that: - Download files from an S3 bucket - Process files locally - Archive successfully processed files to an archive bucket - Move failed files to an error bucket - Clean up local and source files after processing
The workflow for each file: 1. Download from source bucket to local temp folder 2. Process the file locally (via process_local_file) 3. On success: copy to archive_bucket/archive_prefix 4. On error: copy to error_bucket/error_prefix 5. Delete from source bucket 6. Delete local temp file
Example
class MyS3ETL(IBaseEtlOnAwsBucket): def process_local_file(self, local_path: str): # Process the downloaded file with open(local_path, 'r') as f: data = f.read() # ... process data ... etl = MyS3ETL( bucket="my-source-bucket", prefix="incoming/", archive_bucket="my-archive-bucket", archive_prefix="processed/", error_bucket="my-error-bucket", error_prefix="failed/" ) etl.execute()
- __init__(bucket: str | None = None, prefix: str | None = None, archive_bucket: str | None = None, archive_prefix: str | None = None, error_bucket: str | None = None, error_prefix: str | None = None, **kwargs: Any) None[source]#
Initialize the S3-based ETL process.
- Parameters:
bucket – Source S3 bucket containing files to process.
prefix – Path/prefix within the source bucket to retrieve files from.
archive_bucket – S3 bucket for archiving successfully processed files (optional).
archive_prefix – Path/prefix to use when archiving successful files (optional).
error_bucket – S3 bucket for archiving files that failed processing (optional).
error_prefix – Path/prefix to use when archiving failed files (optional).
kwargs – Additional arguments passed to parent classes.
Note
Files are always deleted from the source bucket after processing.
If archive_bucket/error_bucket are not provided, files won’t be archived.
Source files are deleted regardless of archiving success/failure.
- _execute(*args: Any, **kwargs: Any) int[source]#
Execute the ETL process for all files in the S3 bucket.
- Parameters:
args – Positional arguments passed to parent _execute.
kwargs – Keyword arguments passed to parent _execute.
- Returns:
Number of files processed.
- get_paths() Iterator[str][source]#
Retrieve S3 object keys (file paths) from the source bucket. Yields object keys from the configured bucket and prefix that will be downloaded and processed.
- Parameters:
args – Positional arguments (unused).
kwargs – Keyword arguments (unused).
- Returns:
Iterator of S3 object keys (file paths).
- process_file(path: str) None[source]#
Download, process, archive, and clean up a single file from S3.
This method orchestrates the complete workflow for a single file: 1. Downloads the file from S3 to local temp folder 2. Calls process_local_file() to process it 3. Archives to success/error bucket based on processing result 4. Deletes the file from the source S3 bucket 5. Deletes the local temp file
- Parameters:
path – S3 object key (file path) to process.
args – Positional arguments (unused).
kwargs – Keyword arguments (unused).
- abstractmethod process_local_file(local_path: str, *args: Any, **kwargs: Any) None[source]#
Process a downloaded file from the local filesystem. This abstract method must be implemented by subclasses to define the actual processing logic for each file.
- Parameters:
local_path – Absolute path to the downloaded file in the temp folder.
args – Additional positional arguments.
kwargs – Additional keyword arguments.
- Raises:
Exception – Any exception raised will cause the file to be archived to the error bucket instead of the archive bucket.
Example
def process_local_file(self, local_path: str): with open(local_path, 'r') as f: data = json.load(f) # Process data... self.database.insert(data)
- _abc_impl = <_abc._abc_data object>#
SQS Based Interface#
- class core_aws.etls.sqs_based.IBaseEtlOnAwsSQS(queue_name: str, **kwargs: Any)[source]#
Bases:
IBaseEtlOnAWS,ABCBase class for ETL processes that retrieve and process messages from AWS SQS.
- This class provides a framework for ETL processes that:
Poll messages from an SQS queue in batches.
Process each message individually.
Delete successfully processed messages from the queue.
Continue processing until the queue is empty.
Handle message processing errors gracefully.
- The workflow:
Retrieve a batch of messages from SQS.
Process each message via process_message().
Track successfully processed messages.
Delete successful messages from the queue.
Repeat until queue is empty.
Failed messages remain in the queue and will be retried based on the queue’s visibility timeout and retry policy.
Example
class MyDataProcessor(IBaseEtlOnAwsSQS): def process_message(self, message: Dict) -> None: body = json.loads(message["Body"]) # Process the message data self.database.insert(body["data"]) processor = MyDataProcessor( queue_name="my-data-queue", aws_region="us-east-1" ) processor.execute()
- __init__(queue_name: str, **kwargs: Any) None[source]#
Initialize the SQS-based ETL process.
- Parameters:
queue_name – Name of the SQS queue to process messages from.
kwargs – Additional arguments passed to parent IBaseEtlOnAWS class.
- pre_processing() None[source]#
Pre-processing hook to retrieve the SQS queue reference. Fetches the queue object from SQS using the configured queue name. Must be called before _execute().
- Parameters:
kwargs – Additional arguments passed to parent pre_processing.
- Raises:
Exception – If queue cannot be found or accessed.
- _execute(*args: Any, **kwargs: Any) int[source]#
Retrieve and process messages from the SQS queue in batches. Continuously polls the queue for messages, processes each one, and deletes successfully processed messages. Continues until the queue is empty.
- Parameters:
args – Positional arguments (unused).
kwargs – Additional arguments (e.g., MaxNumberOfMessages, WaitTimeSeconds).
- Returns:
Total number of successfully processed messages.
- abstractmethod process_message(message: Dict[str, Any]) None[source]#
Process a single SQS message. This abstract method must be implemented by subclasses to define the actual message processing logic.
- Parameters:
message –
- The SQS message dictionary containing:
MessageId: Unique message identifier.
ReceiptHandle: Token for deleting the message.
Body: The message content (often JSON string).
Attributes: Message attributes.
MessageAttributes: Custom message attributes.
- Raises:
Exception – Any exception raised will cause the message to remain in the queue for retry according to the queue’s visibility timeout and redrive policy.
Example
def process_message(self, message: Dict[str, Any]) -> None: # Parse message body body = json.loads(message["Body"]) # Extract and validate data data = body["data"] validate(data) # Process the data self.database.insert(data) self.info(f"Processed message: {message['MessageId']}")
- _abc_impl = <_abc._abc_data object>#