aboutsummaryrefslogtreecommitdiffstats
path: root/src/transform_lambda
diff options
context:
space:
mode:
Diffstat (limited to 'src/transform_lambda')
-rw-r--r--src/transform_lambda/transform_lambda.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py
index 3dbb57b..478b257 100644
--- a/src/transform_lambda/transform_lambda.py
+++ b/src/transform_lambda/transform_lambda.py
@@ -5,12 +5,11 @@ import logging
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
-from dataframes import *
+from src.transform_lambda.dataframes import *
from botocore.exceptions import ClientError
from pg8000.native import Connection, InterfaceError
from datetime import datetime
-
class DBConnectionException(Exception):
"""Wraps pg8000.native Error or DatabaseError."""
@@ -54,6 +53,7 @@ def lambda_handler(event, context):
bucket = bucket_name("transform")
existing_s3_files = list_existing_s3_files(bucket)
+ # print(existing_s3_files)
dict_of_df = read_from_s3_subfolder_to_df(
TABLES, bucket_name("extract"), client=boto3.client("s3")
@@ -120,11 +120,13 @@ def process_to_parquet_and_upload_to_s3(
# changed parquet_file variable to the file name
client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet")
status["uploaded"].append(table_name)
+ print(status)
for table_name, df in mutable_df_dict.items():
s3_key = datetime.strftime(
datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet"
)
+ print(s3_key, '<<<< this is S3_Key')
parquet_file = df.to_parquet(
f"{table_name}.parquet", engine="pyarrow"
) # or fastparquet
git.ajschof.me — hosted by ajschofield — powered by cgit