aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/load_lambda.py174
-rw-r--r--src/transform_lambda/dataframes.py8
-rw-r--r--src/transform_lambda/transform_lambda.py2
-rw-r--r--tests/test_transform_lambda.py2
4 files changed, 115 insertions, 71 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py
index 272cb8c..cdcf105 100644
--- a/src/load_lambda.py
+++ b/src/load_lambda.py
@@ -7,7 +7,8 @@ import logging
import json
import traceback
from sqlalchemy import create_engine
-
+from datetime import datetime as dt
+import re
logger = logging.getLogger(__name__)
@@ -15,10 +16,10 @@ logging.basicConfig(
format="{asctime} - {levelname} - {message}",
style="{",
datefmt="%Y-%m-%d %H:%M",
- level=logging.DEBUG,
+ level=logging.INFO,
)
-
-logging.getLogger("botocore").setLevel(logging.INFO)
+# logging.getLogger("botocore").setLevel(logging.INFO)
+# logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
def lambda_handler(event, context):
@@ -38,10 +39,10 @@ def lambda_handler(event, context):
),
}
else:
- logger.error(f"error")
+ logger.error(f"error", exc_info=True)
return {"error"}
except Exception as e:
- logger.error({e})
+ logger.error({e}, exc_info=True)
return {"statusCode": 500, "body": {e}}
@@ -58,10 +59,10 @@ def retrieve_secrets(client=None, secret_name=None):
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
print(get_secret_value_response)
except ClientError as e:
- logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}")
+ logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}", exc_info=True)
raise e
except KeyError:
- logger.error(f"Secret {secret_name} does not contain a SecretString")
+ logger.error(f"Secret {secret_name} does not contain a SecretString", exc_info=True)
raise ValueError(f"Secret {secret_name} does not contain a SecretString")
return get_secret_value_response["SecretString"]
@@ -86,7 +87,7 @@ def connect_to_db_and_return_engine(sm_secret=None):
engine = create_engine(conn_str)
return engine
except Exception as e:
- logger.error(f"Interface error: {e}")
+ logger.error(f"Interface error: {e}", exc_info=True)
raise RuntimeError("Failed to create database engine")
@@ -97,7 +98,7 @@ def get_transform_bucket(client=None):
try:
response = client.list_buckets()
except ClientError as e:
- logger.error(f"Error listing S3 buckets: {e}")
+ logger.error(f"Error listing S3 buckets: {e}", exc_info=True)
raise RuntimeError("Error listing S3 buckets")
transform_bucket_filter = [
@@ -107,7 +108,7 @@ def get_transform_bucket(client=None):
]
if not transform_bucket_filter:
- logger.error("No transform bucket found")
+ logger.error("No transform bucket found", exc_info=True)
raise ValueError("No transform bucket found")
return transform_bucket_filter[0]
@@ -117,41 +118,78 @@ def get_transform_bucket(client=None):
# convert parquet files into dataframes
# return a dictionary of dataframes with name as key, and dataframe object as value
+def get_latest_timestamp(existing_files):
+ if existing_files:
+ all_datetimes = []
+ for file_name in existing_files:
+ match = re.search(r"\/(.+/).+_(.+)\.parquet", file_name)
+ if match:
+ datetime_str = "".join(match.group(1, 2))
+ all_datetimes.append(
+ dt.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")
+ )
+ return max(all_datetimes) if all_datetimes else dt.min
+ return existing_files
def convert_parquet_files_to_dfs(bucket_name=None, client=None):
+ mutable_df_dict = [
+ "dim_currency",
+ "fact_sales_order",
+ "fact_purchase_order",
+ "fact_payment"
+
+ ]
+
try:
if client is None:
client = boto3.client("s3")
if bucket_name is None:
bucket_name = get_transform_bucket()
files = client.list_objects_v2(Bucket=bucket_name)
-
+
dfs = {}
if "Contents" in files:
- for file in files["Contents"]:
- file_key = file["Key"]
+ s3_key_list = [file["Key"]for file in files["Contents"]]
+ immutables_l = []
+ mutables_d = {prefix:[] for prefix in mutable_df_dict}
+ for tab, s3_key in mutables_d.items():
+ for file in s3_key_list:
+ if tab in file:
+ s3_key.append(file)
+ elif "2024" not in file:
+ immutables_l.append(file)
+ else:
+ continue
+ immutables_l = list(set(immutables_l))
+ print(mutables_d,'mutables_d')
+ latest_s3_keys = []
+ for k,v in mutables_d.items():
+ latest_s3_keys.append(dt.strftime(get_latest_timestamp(v), f"{k}/%Y/%m/%d/{k}_%H:%M:%S.parquet"))
+ print(latest_s3_keys,'latest')
+ print(immutables_l,'immutables_l')
+ for file_key in latest_s3_keys+immutables_l:
try:
file_obj = client.get_object(Bucket=bucket_name, Key=file_key)
parquet_file = pq.ParquetFile(BytesIO(file_obj["Body"].read()))
df = parquet_file.read().to_pandas()
- print("df", df)
- print("type", type(df))
- print(df.columns)
- dfs[file_key] = df
+ df_without_nulls = df.dropna()
+ #print("df_without_nulls", df_without_nulls)
+ #print("type", type(df_without_nulls))
+ #print(df_without_nulls.columns)
+ dfs[file_key] = df_without_nulls
except ClientError as e:
- logger.error(f"Unable to retrieve S3 object {file_key}: {e}")
+ logger.error(f"Unable to retrieve S3 object {file_key}: {e}", exc_info=True)
except Exception as e:
- logger.error(f"Unable to process file {file_key}: {e}")
+ logger.error(f"Unable to process file {file_key}: {e}", exc_info=True)
else:
- logger.error(f"No files found in {bucket_name}.")
+ logger.error(f"No files found in {bucket_name}.", exc_info=True)
return {}
except ValueError as value_error:
- logger.error(f"Unable to list objects: {value_error}")
+ logger.error(f"Unable to list objects: {value_error}", exc_info=True)
raise
except ClientError as client_error:
- logger.error(f"Unable to list objects: {client_error}")
+ logger.error(f"Unable to list objects: {client_error}", exc_info=True)
raise
- print()
return dfs
@@ -160,53 +198,57 @@ def upload_dfs_to_database():
dict_of_dfs = convert_parquet_files_to_dfs()
db_engine = connect_to_db_and_return_engine()
immutable_df_dict = [
- "dim_counterparty.parquet",
- "dim_date.parquet", # this needs to be mutable
- "dim_location.parquet",
- "dim_staff.parquet",
- "dim_design.parquet"
+ # #"dim_counterparty.parquet",
+ # "dim_date.parquet", # this needs to be mutable
+ # "dim_location.parquet",
+ # "dim_staff.parquet",
+ # "dim_design.parquet"
]
mutable_df_dict = [
+ "dim_currency",
"fact_sales_order",
"fact_purchase_order",
- "fact_payment",
- "dim_currency"
+ "fact_payment"
+
]
-
- for file_name, df in dict_of_dfs.items():
- print(df)
- if file_name in immutable_df_dict:
- table_name = file_name.split(".")[0]
- print(table_name, "<<<<<")
- try:
- df.to_sql(
- table_name,
- con=db_engine,
- schema="project_team_2",
- if_exists="append",
- index=False,
- )
- upload_status["uploaded"].append(table_name)
- except Exception as e:
- logger.error(f"Error uploading dataframe {file_name} to database: {e}")
- raise
- elif file_name.rsplit("_", 1)[0] in mutable_df_dict:
- table_name = file_name.rsplit("_", 1)[0]
- try:
- df.to_sql(
- table_name,
- con=db_engine,
- schema="project_team_2",
- if_exists="append",
- index=False,
- )
- upload_status["uploaded"].append(table_name)
- except Exception as e:
- logger.error(f"Error uploading dataframe {file_name} to database: {e}")
- raise
- else:
- upload_status["not_uploaded"].append(file_name)
- logger.error(f"{file_name} does not correspond with table in database")
+ with db_engine.begin() as connection:
+ for file_name, df in dict_of_dfs.items():
+ print(df.dtypes, "dtypes")
+ print(df.head())
+ if file_name in immutable_df_dict:
+ table_name = file_name.split(".")[0]
+ print(table_name, "<<<<<")
+ try:
+ df.to_sql(
+ table_name,
+ con=connection,
+ schema="project_team_2",
+ if_exists="append",
+ index=False,
+ )
+ upload_status["uploaded"].append(table_name)
+ print(upload_status)
+ except Exception as e:
+ logger.error(f"Error uploading dataframe {file_name} to database: {e}", exc_info=True)
+ raise
+ elif file_name.split("/")[0] in mutable_df_dict:
+ table_name = file_name.split("/")[0]
+ print(table_name, "<<<<<<<TABLE NAME")
+ try:
+ df.to_sql(
+ table_name,
+ con=connection,
+ schema="project_team_2",
+ if_exists="append",
+ index=False,
+ )
+ upload_status["uploaded"].append(table_name)
+ except Exception as e:
+ logger.error(f"Error uploading dataframe {file_name} to database: {e}", exc_info=True)
+ raise
+ else:
+ upload_status["not_uploaded"].append(file_name)
+ logger.error(f"{file_name} does not correspond with table in database", exc_info=True)
db_engine.dispose()
return upload_status
diff --git a/src/transform_lambda/dataframes.py b/src/transform_lambda/dataframes.py
index bf0556b..e89a6b2 100644
--- a/src/transform_lambda/dataframes.py
+++ b/src/transform_lambda/dataframes.py
@@ -18,7 +18,7 @@ import requests
# no test, same as fact_payment
def create_fact_sales_order(dict_of_df):
- df_sales = dict_of_df["sales_order"]
+ df_sales = dict_of_df["sales_order"].rename(columns={"staff_id": "sales_staff_id"})
df_sales.index.name = "sales_record_id"
df_sales["created_date"] = df_sales["created_at"].astype("datetime64[ns]").dt.date
@@ -44,7 +44,7 @@ def create_fact_sales_order(dict_of_df):
"created_time",
"last_updated_date",
"last_updated_time",
- "staff_id",
+ "sales_staff_id",
"counterparty_id",
"units_sold",
"unit_price",
@@ -55,7 +55,7 @@ def create_fact_sales_order(dict_of_df):
"agreed_delivery_location_id"
],
]
- fact_sales.rename(columns={"staff_id": "sales_staff_id"}).reset_index(inplace=True)
+ fact_sales.reset_index(inplace=True)
return fact_sales
@@ -253,6 +253,8 @@ def create_dim_currency(dict_of_df, names=scrape_currency_names()):
df_cur, names, left_on="currency_code", right_on="currency_code", how="left"
)
dim_currency.drop_duplicates(inplace=True)
+ dim_currency.astype({"currency_name": "string", "currency_code": "string"})
+ print(dim_currency.dtypes, "<<<<<<<<<Dtype")
return dim_currency
diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py
index 1453c6c..0b5748b 100644
--- a/src/transform_lambda/transform_lambda.py
+++ b/src/transform_lambda/transform_lambda.py
@@ -5,7 +5,7 @@ import logging
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
-from dataframes import *
+from src.transform_lambda.dataframes import *
from botocore.exceptions import ClientError
from pg8000.native import Connection, InterfaceError
from datetime import datetime
diff --git a/tests/test_transform_lambda.py b/tests/test_transform_lambda.py
index 5ed743e..308dc65 100644
--- a/tests/test_transform_lambda.py
+++ b/tests/test_transform_lambda.py
@@ -1,4 +1,4 @@
-from src.transform_lambda import (
+from src.transform_lambda.transform_lambda import (
read_from_s3_subfolder_to_df,
list_existing_s3_files,
bucket_name,
git.ajschof.me — hosted by ajschofield — powered by cgit