diff options
| author | HastarTara <156914815+HastarTara@users.noreply.github.com> | 2024-08-28 09:52:41 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-08-28 09:52:41 +0100 |
| commit | 65b1ca8b41488eba4db9bcb9773d857493e87b25 (patch) | |
| tree | 37afc6c5d5562ecdb714f553eecd35bae99fdf6b /src/transform_lambda.py | |
| parent | 9b0f24df880160811b4541b8996c06adeed28245 (diff) | |
| parent | 2eea1e16a44e2a03a5d4227e0da07abea8fdc121 (diff) | |
| download | de-project-bentley-65b1ca8b41488eba4db9bcb9773d857493e87b25.tar.gz de-project-bentley-65b1ca8b41488eba4db9bcb9773d857493e87b25.zip | |
Merge pull request #103 from ajschofield/temp_branch_for_pr
Temp branch for pr
Diffstat (limited to 'src/transform_lambda.py')
| -rw-r--r-- | src/transform_lambda.py | 217 |
1 files changed, 0 insertions, 217 deletions
diff --git a/src/transform_lambda.py b/src/transform_lambda.py deleted file mode 100644 index 93b2284..0000000 --- a/src/transform_lambda.py +++ /dev/null @@ -1,217 +0,0 @@ -import json -import boto3 -import re -import logging -import pandas as pd -import pyarrow as pa -import pyarrow.parquet as pq -from dataframes import * -from botocore.exceptions import ClientError -from pg8000.native import Connection, InterfaceError -from datetime import datetime - - -class DBConnectionException(Exception): - """Wraps pg8000.native Error or DatabaseError.""" - - def __init__(self, e): - """Initialise with provided error message.""" - self.message = str(e) - super().__init__(self.message) - - -logger = logging.getLogger(__name__) - -logging.basicConfig( - format="{asctime} - {levelname} - {message}", - style="{", - datefmt="%Y-%m-%d %H:%M", - level=logging.DEBUG, -) - -logging.getLogger("botocore").setLevel(logging.WARNING) - -TABLES = [ - "sales_order", - "transaction", - "payment", - "counterparty", - "address", - "staff", - "purchase_order", - "department", - "currency", - "design", - "payment_type", -] - - -def lambda_handler(event, context): - db = None - - try: - db = connect_to_database() - bucket = bucket_name("transform") - - existing_s3_files = list_existing_s3_files(bucket) - - dict_of_df = read_from_s3_subfolder_to_df( - TABLES, bucket_name("extract"), client=boto3.client("s3") - ) - - immutable_df_dict = { - "dim_counterparty": create_dim_counterparty(dict_of_df), - "dim_date": create_dim_date(dict_of_df), - "dim_location": create_dim_location(dict_of_df), - "dim_staff": create_dim_staff(dict_of_df), - "dim_design": create_dim_design(dict_of_df), - } - - mutable_df_dict = { - "fact_sales_order": create_fact_sales_order(dict_of_df), - "fact_purchase_order": create_fact_purchase_orders(dict_of_df), - "fact_payment": create_fact_payment(dict_of_df), - "dim_currency": create_dim_currency(dict_of_df), - } - - status = process_to_parquet_and_upload_to_s3( - existing_s3_files, immutable_df_dict, mutable_df_dict, bucket - ) - - if not status["uploaded"]: - logger.info("No dataframes written to the bucket.") - return { - "statusCode": 204, - "body": json.dumps("No files where uploaded."), - } - - return { - "statusCode": 200, - "body": json.dumps( - f"""Parquet files processed for {', '.join(status['uploaded'])} and uploaded successfully.{ - 'The following tables were not uploaded: '+', '.join([status['not_uploaded']]) if status['not_uploaded'] else ''}""" - ), - } - - except Exception as e: - logger.error(f"Error: {e}", exc_info=True) - return {"statusCode": 500, "body": json.dumps("Internal server error.")} - finally: - if db: - db.close() - - -def process_to_parquet_and_upload_to_s3( - existing_s3_files, - immutable_df_dict, - mutable_df_dict, - bucket, - client=boto3.client("s3"), -): - status = {"uploaded": [], "not_uploaded": []} - - for table_name, df in immutable_df_dict.items(): - if table_name in existing_s3_files: - status["not_uploaded"].append(table_name) - else: - parquet_file = df.to_parquet( - f"{table_name}.parquet", engine="pyarrow" - ) # or fastparquet - # changed parquet_file variable to the file name - client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet") - status["uploaded"].append(table_name) - - for table_name, df in mutable_df_dict.items(): - s3_key = datetime.strftime( - datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet" - ) - parquet_file = df.to_parquet( - f"{table_name}.parquet", engine="pyarrow" - ) # or fastparquet - client.upload_file(f"{table_name}.parquet", bucket, s3_key) - status["uploaded"].append(table_name) - - return status - - -def retrieve_secrets(): - secret_name = "bentley-secrets" - region_name = "eu-west-2" - - # Create a Secrets Manager client - session = boto3.session.Session() - client = session.client(service_name="secretsmanager", region_name=region_name) - - try: - get_secret_value_response = client.get_secret_value(SecretId=secret_name) - except ClientError as e: - logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}") - raise e - except KeyError: - logger.error(f"Secret {secret_name} does not contain a SecretString") - raise ValueError(f"Secret {secret_name} does not contain a SecretString") - - return get_secret_value_response["SecretString"] - - -def connect_to_database() -> Connection: - try: - secrets = json.loads(retrieve_secrets()) - host = secrets["host"] - port = secrets["port"] - user = secrets["user"] - password = secrets["password"] - database = secrets["database"] - - return Connection( - database=database, user=user, password=password, host=host, port=port - ) - except InterfaceError as i: - logger.error(f"Interface error: {i}") - raise DBConnectionException("Failed to connect to database") - - -def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")): - table_dfs = {} - for table in tables: - response = client.list_objects_v2(Bucket=bucket, Prefix=table) - list_of_keys = [ - "s3://" + bucket + "/" + object["Key"] for object in response["Contents"] - ] - list_of_df = [pd.read_csv(key) for key in list_of_keys] - table_dfs[table] = pd.concat(list_of_df) - return table_dfs - - -def bucket_name(bucket_prefix, client=boto3.client("s3")): - response = client.list_buckets() - bucket_filter = [ - bucket["Name"] - for bucket in response["Buckets"] - if bucket_prefix in bucket["Name"] - ] - - return bucket_filter[0] - - -def list_existing_s3_files(bucket_name, client=boto3.client("s3")): - logging.info("Listing existing S3 files") - - try: - response = client.list_objects_v2(Bucket=bucket_name) - - if "Contents" in response: - existing_files = [obj["Key"] for obj in response["Contents"]] - else: - logger.error("The bucket is empty") - return [] # changed from None to [] so it is an iterable - - except ClientError as e: - logger.error(f"Error listing S3 objects: {e}") - raise e - - return existing_files - - -if __name__ == "__main__": - lambda_handler({}, "") |
