aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/load_lambda.py78
-rw-r--r--src/transform_lambda/dataframes.py116
-rw-r--r--src/transform_lambda/transform_lambda.py6
3 files changed, 105 insertions, 95 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py
index 941ae97..86189dc 100644
--- a/src/load_lambda.py
+++ b/src/load_lambda.py
@@ -58,10 +58,14 @@ def retrieve_secrets(client=None, secret_name=None):
try:
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
except ClientError as e:
- logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}", exc_info=True)
+ logger.error(
+ f"Failed to retrieve secret {secret_name}: {str(e)}", exc_info=True
+ )
raise e
except KeyError:
- logger.error(f"Secret {secret_name} does not contain a SecretString", exc_info=True)
+ logger.error(
+ f"Secret {secret_name} does not contain a SecretString", exc_info=True
+ )
raise ValueError(f"Secret {secret_name} does not contain a SecretString")
return get_secret_value_response["SecretString"]
@@ -117,6 +121,7 @@ def get_transform_bucket(client=None):
# convert parquet files into dataframes
# return a dictionary of dataframes with name as key, and dataframe object as value
+
def get_latest_timestamp(existing_files):
if existing_files:
all_datetimes = []
@@ -124,19 +129,17 @@ def get_latest_timestamp(existing_files):
match = re.search(r"\/(.+/).+_(.+)\.parquet", file_name)
if match:
datetime_str = "".join(match.group(1, 2))
- all_datetimes.append(
- dt.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")
- )
+ all_datetimes.append(dt.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S"))
return max(all_datetimes) if all_datetimes else dt.min
return existing_files
+
def convert_parquet_files_to_dfs(bucket_name=None, client=None):
mutable_df_dict = [
"dim_currency",
"fact_sales_order",
"fact_purchase_order",
- "fact_payment"
-
+ "fact_payment",
]
try:
@@ -145,12 +148,12 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None):
if bucket_name is None:
bucket_name = get_transform_bucket()
files = client.list_objects_v2(Bucket=bucket_name)
-
+
dfs = {}
if "Contents" in files:
- s3_key_list = [file["Key"]for file in files["Contents"]]
+ s3_key_list = [file["Key"] for file in files["Contents"]]
immutables_l = []
- mutables_d = {prefix:[] for prefix in mutable_df_dict}
+ mutables_d = {prefix: [] for prefix in mutable_df_dict}
for tab, s3_key in mutables_d.items():
for file in s3_key_list:
if tab in file:
@@ -161,22 +164,31 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None):
continue
immutables_l = list(set(immutables_l))
latest_s3_keys = []
- for k,v in mutables_d.items():
- latest_s3_keys.append(dt.strftime(get_latest_timestamp(v), f"{k}/%Y/%m/%d/{k}_%H:%M:%S.parquet"))
- for file_key in immutables_l+latest_s3_keys:
+ for k, v in mutables_d.items():
+ latest_s3_keys.append(
+ dt.strftime(
+ get_latest_timestamp(v), f"{k}/%Y/%m/%d/{k}_%H:%M:%S.parquet"
+ )
+ )
+ for file_key in immutables_l + latest_s3_keys:
try:
file_obj = client.get_object(Bucket=bucket_name, Key=file_key)
parquet_file = pq.ParquetFile(BytesIO(file_obj["Body"].read()))
df = parquet_file.read().to_pandas()
- df_without_nulls = df.dropna(how='all') #>> can't do 'any' (default) because we lose rows in dim_location
- #print("df_without_nulls", df_without_nulls)
- #print("type", type(df_without_nulls))
- #print(df_without_nulls.columns)
+ # >> can't do 'any' (default) because we lose rows in dim_location
+ df_without_nulls = df.dropna(how="all")
+ # print("df_without_nulls", df_without_nulls)
+ # print("type", type(df_without_nulls))
+ # print(df_without_nulls.columns)
dfs[file_key] = df_without_nulls
except ClientError as e:
- logger.error(f"Unable to retrieve S3 object {file_key}: {e}", exc_info=True)
+ logger.error(
+ f"Unable to retrieve S3 object {file_key}: {e}", exc_info=True
+ )
except Exception as e:
- logger.error(f"Unable to process file {file_key}: {e}", exc_info=True)
+ logger.error(
+ f"Unable to process file {file_key}: {e}", exc_info=True
+ )
else:
logger.error(f"No files found in {bucket_name}.", exc_info=True)
return {}
@@ -199,23 +211,22 @@ def upload_dfs_to_database():
"dim_location.parquet",
"dim_staff.parquet",
"dim_design.parquet",
- 'dim_transaction.parquet', #This one was missing,
- 'dim_payment_type.parquet'
+ "dim_transaction.parquet", # This one was missing,
+ "dim_payment_type.parquet",
]
mutable_df_dict = [
"dim_currency",
"fact_sales_order",
"fact_purchase_order",
- "fact_payment"
-
+ "fact_payment",
]
with db_engine.begin() as connection:
for file_name, df in dict_of_dfs.items():
print(df.dtypes, "dtypes")
print(df.head())
- print(file_name,"<<< FILE NAME")
- print(immutable_df_dict,"<<<IMMUTABLE_DF_DICT")
- if file_name in immutable_df_dict:
+ print(file_name, "<<< FILE NAME")
+ print(immutable_df_dict, "<<<IMMUTABLE_DF_DICT")
+ if file_name in immutable_df_dict:
table_name = file_name.split(".")[0]
print(table_name, "<<<<<")
try:
@@ -229,7 +240,10 @@ def upload_dfs_to_database():
upload_status["uploaded"].append(table_name)
print(upload_status)
except Exception as e:
- logger.error(f"Error uploading dataframe {file_name} to database: {e}", exc_info=True)
+ logger.error(
+ f"Error uploading dataframe {file_name} to database: {e}",
+ exc_info=True,
+ )
raise
elif file_name.split("/")[0] in mutable_df_dict:
table_name = file_name.split("/")[0]
@@ -244,11 +258,17 @@ def upload_dfs_to_database():
)
upload_status["uploaded"].append(table_name)
except Exception as e:
- logger.error(f"Error uploading dataframe {file_name} to database: {e}", exc_info=True)
+ logger.error(
+ f"Error uploading dataframe {file_name} to database: {e}",
+ exc_info=True,
+ )
raise
else:
upload_status["not_uploaded"].append(file_name)
- logger.error(f"{file_name} does not correspond with table in database", exc_info=True)
+ logger.error(
+ f"{file_name} does not correspond with table in database",
+ exc_info=True,
+ )
print(upload_status)
db_engine.dispose()
return upload_status
diff --git a/src/transform_lambda/dataframes.py b/src/transform_lambda/dataframes.py
index c823b87..6de58e7 100644
--- a/src/transform_lambda/dataframes.py
+++ b/src/transform_lambda/dataframes.py
@@ -36,7 +36,8 @@ def create_fact_sales_order(dict_of_df):
df_sales["agreed_payment_date"] = pd.to_datetime(
df_sales["agreed_payment_date"], format="%Y-%m-%d"
)
- fact_sales = df_sales.loc[:,
+ fact_sales = df_sales.loc[
+ :,
[
"sales_order_id",
"created_date",
@@ -51,7 +52,7 @@ def create_fact_sales_order(dict_of_df):
"design_id",
"agreed_payment_date",
"agreed_delivery_date",
- "agreed_delivery_location_id"
+ "agreed_delivery_location_id",
],
]
fact_sales.convert_dtypes()
@@ -81,24 +82,24 @@ def create_fact_purchase_orders(dict_of_df):
df_po["agreed_payment_date"] = pd.to_datetime(
df_po["agreed_payment_date"], format="%Y-%m-%d"
)
- fact_purchase_order = df_po.loc[:,
- [
- "purchase_order_id",
- "created_date",
- "created_time",
- "last_updated_date",
- "last_updated_time",
- "staff_id",
- "counterparty_id",
- "item_code",
- "item_quantity",
- "item_unit_price",
- "currency_id",
- "agreed_delivery_date",
- "agreed_payment_date",
- "agreed_delivery_location_id"
- ]
-
+ fact_purchase_order = df_po.loc[
+ :,
+ [
+ "purchase_order_id",
+ "created_date",
+ "created_time",
+ "last_updated_date",
+ "last_updated_time",
+ "staff_id",
+ "counterparty_id",
+ "item_code",
+ "item_quantity",
+ "item_unit_price",
+ "currency_id",
+ "agreed_delivery_date",
+ "agreed_payment_date",
+ "agreed_delivery_location_id",
+ ],
]
fact_purchase_order.convert_dtypes()
fact_purchase_order.index = pd.RangeIndex(1, len(fact_purchase_order.index) + 1)
@@ -128,28 +129,29 @@ def create_fact_payment(dict_of_df):
df_payment["payment_date"] = pd.to_datetime(
df_payment["payment_date"], format="%Y-%m-%d"
)
- fact_payment = df_payment.loc[:,
+ fact_payment = df_payment.loc[
+ :,
[
- "payment_id",
- "created_date",
- "created_time",
- "last_updated_date",
- "last_updated_time",
- "transaction_id",
- "counterparty_id",
- "payment_amount",
- "currency_id",
- "payment_type_id",
- "paid",
- "payment_date"
- ]
+ "payment_id",
+ "created_date",
+ "created_time",
+ "last_updated_date",
+ "last_updated_time",
+ "transaction_id",
+ "counterparty_id",
+ "payment_amount",
+ "currency_id",
+ "payment_type_id",
+ "paid",
+ "payment_date",
+ ],
]
fact_payment.convert_dtypes()
fact_payment.index = pd.RangeIndex(1, len(fact_payment.index) + 1)
fact_payment.index.name = "payment_record_id"
fact_payment.reset_index(inplace=True)
fact_payment.dropna(inplace=True)
- fact_payment = fact_payment.astype({'currency_id':'int','payment_id':'int'})
+ fact_payment = fact_payment.astype({"currency_id": "int", "payment_id": "int"})
return fact_payment
@@ -157,15 +159,10 @@ def create_fact_payment(dict_of_df):
def create_dim_transaction(dict_of_df):
- dim_transaction = dict_of_df["transaction"].loc[:,
- [
- "transaction_id",
- "transaction_type",
- "sales_order_id",
- "purchase_order_id"
- ]
+ dim_transaction = dict_of_df["transaction"].loc[
+ :, ["transaction_id", "transaction_type", "sales_order_id", "purchase_order_id"]
]
- #dim_transaction = dim_transaction.astype({"sales_order_id":"Int64","purchase_order_id":"Int64"})
+ # dim_transaction = dim_transaction.astype({"sales_order_id":"Int64","purchase_order_id":"Int64"})
return dim_transaction
@@ -174,7 +171,8 @@ def create_dim_transaction(dict_of_df):
def create_dim_location(dict_of_df):
dim_location = (
- dict_of_df["address"].drop(labels=["created_at", "last_updated"], axis=1)
+ dict_of_df["address"]
+ .drop(labels=["created_at", "last_updated"], axis=1)
.rename(columns={"address_id": "location_id"})
)
return dim_location
@@ -193,7 +191,7 @@ def create_dim_counterparty(dict_of_df):
left_on="legal_address_id",
right_on="counterparty_legal_address_id",
how="inner",
- )#.dropna(inplace=True)
+ ) # .dropna(inplace=True)
dim_counterparty = df_cp.drop(
labels=[
"legal_address_id",
@@ -201,8 +199,9 @@ def create_dim_counterparty(dict_of_df):
"created_at",
"last_updated",
"commercial_contact",
- "delivery_contact"
- ], axis=1
+ "delivery_contact",
+ ],
+ axis=1,
)
return dim_counterparty
@@ -272,12 +271,7 @@ def create_dim_currency(dict_of_df, names=scrape_currency_names()):
def create_dim_payment_type(dict_of_df):
df_payment_type = dict_of_df["payment_type"]
- dim_payment_type = df_payment_type.loc[:,
- [
- "payment_type_id",
- "payment_type_name"
- ]
- ]
+ dim_payment_type = df_payment_type.loc[:, ["payment_type_id", "payment_type_name"]]
return dim_payment_type
@@ -286,13 +280,8 @@ def create_dim_payment_type(dict_of_df):
def create_dim_design(dict_of_df):
df_design = dict_of_df["design"]
- dim_design = df_design.loc[:,
- [
- "design_id",
- "design_name",
- "file_name",
- "file_location"
- ]
+ dim_design = df_design.loc[
+ :, ["design_id", "design_name", "file_name", "file_location"]
]
return dim_design
@@ -304,14 +293,15 @@ def create_dim_staff(dict_of_df):
staff_department = pd.merge(
dict_of_df["staff"], dict_of_df["department"], on="department_id", how="left"
)
- dim_staff = staff_department.loc[:,
+ dim_staff = staff_department.loc[
+ :,
[
"staff_id",
"first_name",
"last_name",
"department_name",
"location",
- "email_address"
- ]
+ "email_address",
+ ],
]
return dim_staff
diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py
index 5ea8cf0..2739997 100644
--- a/src/transform_lambda/transform_lambda.py
+++ b/src/transform_lambda/transform_lambda.py
@@ -42,7 +42,7 @@ TABLES = [
"department",
"currency",
"design",
- "payment_type"
+ "payment_type",
]
@@ -66,14 +66,14 @@ def lambda_handler(event, context):
"dim_staff": create_dim_staff(dict_of_df),
"dim_design": create_dim_design(dict_of_df),
"dim_transaction": create_dim_transaction(dict_of_df),
- "dim_payment_type": create_dim_payment_type(dict_of_df)
+ "dim_payment_type": create_dim_payment_type(dict_of_df),
}
mutable_df_dict = {
"fact_sales_order": create_fact_sales_order(dict_of_df),
"fact_purchase_order": create_fact_purchase_orders(dict_of_df),
"fact_payment": create_fact_payment(dict_of_df),
- "dim_currency": create_dim_currency(dict_of_df)
+ "dim_currency": create_dim_currency(dict_of_df),
}
print(immutable_df_dict.values())
print(mutable_df_dict.values())
git.ajschof.me — hosted by ajschofield — powered by cgit