diff options
| author | lian-manonog <lian.manonog@gmail.com> | 2024-08-28 14:59:05 +0100 |
|---|---|---|
| committer | lian-manonog <lian.manonog@gmail.com> | 2024-08-28 14:59:05 +0100 |
| commit | fadc54b7f72eca1eccbb9f4e7bb8ffca0960ebfa (patch) | |
| tree | da2fff127164b54ec8af3e8157ca45da9e1ae56f /src/transform_lambda/transform_lambda.py | |
| parent | f9f1ebc3eb7a9d4f312db5c1402a0197e0777b29 (diff) | |
| download | de-project-bentley-fadc54b7f72eca1eccbb9f4e7bb8ffca0960ebfa.tar.gz de-project-bentley-fadc54b7f72eca1eccbb9f4e7bb8ffca0960ebfa.zip | |
wip finished testing the process and upload parquet
Diffstat (limited to 'src/transform_lambda/transform_lambda.py')
| -rw-r--r-- | src/transform_lambda/transform_lambda.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py index 3dbb57b..478b257 100644 --- a/src/transform_lambda/transform_lambda.py +++ b/src/transform_lambda/transform_lambda.py @@ -5,12 +5,11 @@ import logging import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -from dataframes import * +from src.transform_lambda.dataframes import * from botocore.exceptions import ClientError from pg8000.native import Connection, InterfaceError from datetime import datetime - class DBConnectionException(Exception): """Wraps pg8000.native Error or DatabaseError.""" @@ -54,6 +53,7 @@ def lambda_handler(event, context): bucket = bucket_name("transform") existing_s3_files = list_existing_s3_files(bucket) + # print(existing_s3_files) dict_of_df = read_from_s3_subfolder_to_df( TABLES, bucket_name("extract"), client=boto3.client("s3") @@ -120,11 +120,13 @@ def process_to_parquet_and_upload_to_s3( # changed parquet_file variable to the file name client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet") status["uploaded"].append(table_name) + print(status) for table_name, df in mutable_df_dict.items(): s3_key = datetime.strftime( datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet" ) + print(s3_key, '<<<< this is S3_Key') parquet_file = df.to_parquet( f"{table_name}.parquet", engine="pyarrow" ) # or fastparquet |
