Source code for core_aws.etls.bucket_based

# -*- coding: utf-8 -*-

import os
from abc import ABC, abstractmethod
from typing import Any, Iterator, Optional

from core_etl.file_based import IBaseEtlFromFile

from core_aws.etls.base import IBaseEtlOnAWS


[docs] class IBaseEtlOnAwsBucket(IBaseEtlFromFile, IBaseEtlOnAWS, ABC): """ Base class for ETL processes that retrieve and process files from AWS S3. This class provides a framework for ETL processes that: - Download files from an S3 bucket - Process files locally - Archive successfully processed files to an archive bucket - Move failed files to an error bucket - Clean up local and source files after processing The workflow for each file: 1. Download from source bucket to local temp folder 2. Process the file locally (via process_local_file) 3. On success: copy to archive_bucket/archive_prefix 4. On error: copy to error_bucket/error_prefix 5. Delete from source bucket 6. Delete local temp file Example: .. code-block:: python class MyS3ETL(IBaseEtlOnAwsBucket): def process_local_file(self, local_path: str): # Process the downloaded file with open(local_path, 'r') as f: data = f.read() # ... process data ... etl = MyS3ETL( bucket="my-source-bucket", prefix="incoming/", archive_bucket="my-archive-bucket", archive_prefix="processed/", error_bucket="my-error-bucket", error_prefix="failed/" ) etl.execute() .. """
[docs] def __init__( self, bucket: Optional[str] = None, prefix: Optional[str] = None, archive_bucket: Optional[str] = None, archive_prefix: Optional[str] = None, error_bucket: Optional[str] = None, error_prefix: Optional[str] = None, **kwargs: Any ) -> None: """ Initialize the S3-based ETL process. :param bucket: Source S3 bucket containing files to process. :param prefix: Path/prefix within the source bucket to retrieve files from. :param archive_bucket: S3 bucket for archiving successfully processed files (optional). :param archive_prefix: Path/prefix to use when archiving successful files (optional). :param error_bucket: S3 bucket for archiving files that failed processing (optional). :param error_prefix: Path/prefix to use when archiving failed files (optional). :param kwargs: Additional arguments passed to parent classes. Note: - Files are always deleted from the source bucket after processing. - If archive_bucket/error_bucket are not provided, files won't be archived. - Source files are deleted regardless of archiving success/failure. """ super().__init__(**kwargs) self.bucket = bucket self.archive_bucket = archive_bucket self.error_bucket = error_bucket self.prefix = prefix self.archive_prefix = archive_prefix self.error_prefix = error_prefix
[docs] def _execute(self, *args: Any, **kwargs: Any) -> int: """ Execute the ETL process for all files in the S3 bucket. :param args: Positional arguments passed to parent _execute. :param kwargs: Keyword arguments passed to parent _execute. :return: Number of files processed. """ self.info(f"Retrieving files from bucket: {self.bucket}, path: {self.prefix}...") return super()._execute(*args, **kwargs)
[docs] def get_paths(self) -> Iterator[str]: """ Retrieve S3 object keys (file paths) from the source bucket. Yields object keys from the configured bucket and prefix that will be downloaded and processed. :param args: Positional arguments (unused). :param kwargs: Keyword arguments (unused). :return: Iterator of S3 object keys (file paths). """ if not self.bucket: raise ValueError("bucket must be configured") if not self.prefix: raise ValueError("prefix must be configured") for rec in self.s3_client.list_all_objects(self.bucket, self.prefix): yield rec["Key"]
[docs] def process_file(self, path: str) -> None: """ Download, process, archive, and clean up a single file from S3. This method orchestrates the complete workflow for a single file: 1. Downloads the file from S3 to local temp folder 2. Calls process_local_file() to process it 3. Archives to success/error bucket based on processing result 4. Deletes the file from the source S3 bucket 5. Deletes the local temp file :param path: S3 object key (file path) to process. :param args: Positional arguments (unused). :param kwargs: Keyword arguments (unused). """ if not self.bucket: raise ValueError("bucket must be configured") self.info(f"Downloading file: {path}...") file_name = os.path.basename(path) local_path = self.s3_client.download_file( local_path=f"{self.temp_folder}/{file_name}", bucket=self.bucket, key=path) self.info("Downloaded!") final_bucket: Optional[str] = None final_path: Optional[str] = None try: self.info(f"Processing file: {file_name}.") self.process_local_file(local_path) self.info("Processed!") except Exception as error: self.error(f"Error processing file: {error}.") final_bucket = self.error_bucket final_path = self.error_prefix else: final_bucket = self.archive_bucket final_path = self.archive_prefix finally: # Archive to appropriate bucket (success or error) if final_bucket and final_path: self.info(f"Archiving into bucket: {final_bucket}, path: {final_path}...") try: self.s3_client.copy( copy_source={ "Bucket": self.bucket, "Key": path }, bucket=final_bucket, key=f"{final_path.rstrip('/')}/{file_name}", ) self.info("Archived successfully!") except Exception as archive_error: self.error(f"Failed to archive file: {archive_error}") else: self.info("Skipping archiving (no destination bucket configured).") # Deleting from source bucket... try: self.info(f"Deleting file: {path} from bucket: {self.bucket}...") self.s3_client.delete_object(self.bucket, key=path) self.info("File deleted from S3!") except Exception as delete_error: self.error(f"Failed to delete file from S3: {delete_error}") # Deleting local temporal file... try: if os.path.exists(local_path): os.remove(local_path) self.info(f"Local temp file deleted: {local_path}") except Exception as local_delete_error: self.error(f"Failed to delete local file: {local_delete_error}")
[docs] @abstractmethod def process_local_file(self, local_path: str, *args: Any, **kwargs: Any) -> None: """ Process a downloaded file from the local filesystem. This abstract method must be implemented by subclasses to define the actual processing logic for each file. :param local_path: Absolute path to the downloaded file in the temp folder. :param args: Additional positional arguments. :param kwargs: Additional keyword arguments. :raises Exception: Any exception raised will cause the file to be archived to the error bucket instead of the archive bucket. Example: .. code-block:: python def process_local_file(self, local_path: str): with open(local_path, 'r') as f: data = json.load(f) # Process data... self.database.insert(data) .. """