diff options
| author | Ang Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local> | 2024-08-27 17:00:04 +0100 |
|---|---|---|
| committer | Ang Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local> | 2024-08-27 17:00:04 +0100 |
| commit | 1a145a36d524a785c821aafbdb3512c24be6c57e (patch) | |
| tree | 822d2a621343cd2f3b1a6389f0ccf42bf69e63d6 /src | |
| parent | 22df92bcce7ec2d9e713b9609ffdd604d207e713 (diff) | |
| download | de-project-bentley-1a145a36d524a785c821aafbdb3512c24be6c57e.tar.gz de-project-bentley-1a145a36d524a785c821aafbdb3512c24be6c57e.zip | |
test: transform refactoring - it now loads parquet files into s3 bucket
Diffstat (limited to 'src')
| -rw-r--r-- | src/dataframes.py | 32 | ||||
| -rw-r--r-- | src/transform_lambda.py | 6 |
2 files changed, 19 insertions, 19 deletions
diff --git a/src/dataframes.py b/src/dataframes.py index 1f445a4..9d0f2ac 100644 --- a/src/dataframes.py +++ b/src/dataframes.py @@ -20,13 +20,13 @@ import requests def create_fact_sales_order(dict_of_df): df_sales = dict_of_df["sales_order"] df_sales.index.name = "sales_record_id" - df_sales["created_date"] = pd.to_datetime(df_sales["created_at"].dt.date,format='%Y-%m-%d') - df_sales["created_time"] = df_sales["created_at"].dt.floor('s').dt.time - df_sales["last_updated_date"] = pd.to_datetime(df_sales["last_updated"].dt.date,format='%Y-%m-%d') - df_sales["last_updated_time"] = df_sales["last_updated"].dt.floor('s').dt.time + df_sales["created_date"] = df_sales["created_at"].astype('datetime64[ns]').dt.date + df_sales["created_time"] = df_sales["created_at"].astype('datetime64[ns]').dt.floor('s').dt.time + df_sales["last_updated_date"] = df_sales["last_updated"].astype('datetime64[ns]').dt.date + df_sales["last_updated_time"] = df_sales["last_updated"].astype('datetime64[ns]').dt.floor('s').dt.time df_sales['agreed_delivery_date'] = pd.to_datetime(df_sales['agreed_delivery_date'],format="%Y-%m-%d") df_sales['agreed_payment_date'] = pd.to_datetime(df_sales['agreed_payment_date'],format="%Y-%m-%d") - df_sales.drop(labels=['created_at','last_updated'],axis=1,inplace=True) + df_sales = df_sales.drop(labels=['created_at','last_updated'],axis=1) df_sales.reset_index(inplace=True) return df_sales @@ -34,13 +34,13 @@ def create_fact_sales_order(dict_of_df): def create_fact_purchase_orders(dict_of_df): df_po = dict_of_df['purchase_order'] df_po.index.name = 'purchase_record_id' - df_po['created_date'] = pd.to_datetime(df_po['created_at'].dt.date,format='%Y-%m-%d') - df_po['created_time'] = df_po['created_at'].dt.floor('s').dt.time - df_po['last_updated_date'] = pd.to_datetime(df_po['last_updated'].dt.date,format='%Y-%m-%d') - df_po['last_updated_time'] = df_po['last_updated'].dt.floor('s').dt.time + df_po['created_date'] = df_po['created_at'].astype('datetime64[ns]').dt.date + df_po['created_time'] = df_po['created_at'].astype('datetime64[ns]').dt.floor('s').dt.time + df_po['last_updated_date'] = df_po['last_updated'].astype('datetime64[ns]').dt.date + df_po['last_updated_time'] = df_po['last_updated'].astype('datetime64[ns]').dt.floor('s').dt.time df_po['agreed_delivery_date'] = pd.to_datetime(df_po['agreed_delivery_date'],format="%Y-%m-%d") df_po['agreed_payment_date'] = pd.to_datetime(df_po['agreed_payment_date'],format="%Y-%m-%d") - df_po.drop(labels=['created_at','last_updated'],axis=1,inplace=True) + df_po = df_po.drop(labels=['created_at','last_updated'],axis=1) df_po.reset_index(inplace=True) return df_po @@ -48,12 +48,12 @@ def create_fact_purchase_orders(dict_of_df): def create_fact_payment(dict_of_df): df_payment = dict_of_df["payment"] df_payment.index.name = "payment_record_id" - df_payment["created_date"] = pd.to_datetime(df_payment["created_at"].dt.date,format='%Y-%m-%d') - df_payment["created_time"] = df_payment["created_at"].dt.floor('s').dt.time - df_payment["last_updated_date"] = pd.to_datetime(df_payment["last_updated"].dt.date,format='%Y-%m-%d') - df_payment["last_updated_time"] = df_payment["last_updated"].dt.floor('s').dt.time + df_payment["created_date"] = df_payment["created_at"].astype('datetime64[ns]').dt.date + df_payment["created_time"] = df_payment["created_at"].astype('datetime64[ns]').dt.floor('s').dt.time + df_payment["last_updated_date"] = df_payment["last_updated"].astype('datetime64[ns]').dt.date + df_payment["last_updated_time"] = df_payment["last_updated"].astype('datetime64[ns]').dt.floor('s').dt.time df_payment['payment_date'] = pd.to_datetime(df_payment['payment_date'],format="%Y-%m-%d") - df_payment.drop(labels=['created_at','last_updated'],axis=1,inplace=True) + df_payment = df_payment.drop(labels=['created_at','last_updated'],axis=1) df_payment.reset_index(inplace=True) return df_payment @@ -83,7 +83,7 @@ def create_dim_date(dict_of_df): fact_dfs = [create_fact_payment(dict_of_df), create_fact_purchase_orders(dict_of_df), create_fact_sales_order(dict_of_df)] list_of_date_columns = [] for df in fact_dfs: - date_col_names = [col_name for col_name in list(df.columns) if 'date' in col_name] + date_col_names = [col_name for col_name in list(df.columns) if '_date' in col_name] for col in date_col_names: list_of_date_columns.append(df[col]) sr_date = pd.array(pd.concat(list_of_date_columns),dtype='datetime64[ns]') diff --git a/src/transform_lambda.py b/src/transform_lambda.py index 2cd9272..ccf90e5 100644 --- a/src/transform_lambda.py +++ b/src/transform_lambda.py @@ -117,7 +117,7 @@ def process_to_parquet_and_upload_to_s3( parquet_file = df.to_parquet( f"{table_name}.parquet", engine="pyarrow" ) # or fastparquet - client.upload_file(parquet_file, bucket, f"{table_name}.parquet") + client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet") #changed parquet_file variable to the file name status["uploaded"].append(table_name) for table_name, df in mutable_df_dict.items(): @@ -127,7 +127,7 @@ def process_to_parquet_and_upload_to_s3( parquet_file = df.to_parquet( f"{table_name}.parquet", engine="pyarrow" ) # or fastparquet - client.upload_file(parquet_file, bucket, s3_key) + client.upload_file(f"{table_name}.parquet", bucket, s3_key) status["uploaded"].append(table_name) return status @@ -203,7 +203,7 @@ def list_existing_s3_files(bucket_name, client=boto3.client("s3")): existing_files = [obj["Key"] for obj in response["Contents"]] else: logger.error("The bucket is empty") - return None + return [] #changed from None to [] so it is an iterable except ClientError as e: logger.error(f"Error listing S3 objects: {e}") |
