aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorHastarTara <joslinrashleigh@gmail.com>2024-08-27 17:02:25 +0100
committerHastarTara <joslinrashleigh@gmail.com>2024-08-27 17:02:25 +0100
commitad357ff34202827720dc216562dfbb0fbd65c297 (patch)
tree6dff86986704ef76c8f274e39e3674f443dc1466 /src
parent836f71dbea59a35b2eeeeeb982a73c4366089722 (diff)
downloadde-project-bentley-ad357ff34202827720dc216562dfbb0fbd65c297.tar.gz
de-project-bentley-ad357ff34202827720dc216562dfbb0fbd65c297.zip
test updates to transform lambda handler
Diffstat (limited to 'src')
-rw-r--r--src/transform_lambda.py59
1 files changed, 34 insertions, 25 deletions
diff --git a/src/transform_lambda.py b/src/transform_lambda.py
index cd9541d..9830e0f 100644
--- a/src/transform_lambda.py
+++ b/src/transform_lambda.py
@@ -9,7 +9,7 @@ import pyarrow.parquet as pq
from botocore.exceptions import ClientError
from pg8000.native import Connection, InterfaceError
from datetime import datetime
-
+import io
class DBConnectionException(Exception):
"""Wraps pg8000.native Error or DatabaseError."""
@@ -59,6 +59,8 @@ def lambda_handler(event, context):
TABLES, bucket_name("extract"), client=boto3.client("s3")
)
+ print(dict_of_df)
+
immutable_df_dict = {
"dim_counterparty": create_dim_counterparty(dict_of_df),
"dim_date": create_dim_date(dict_of_df),
@@ -106,7 +108,7 @@ def process_to_parquet_and_upload_to_s3(
immutable_df_dict,
mutable_df_dict,
bucket,
- client=boto3.client("s3"),
+ client=boto3.client("s3")
):
status = {"uploaded": [], "not_uploaded": []}
@@ -114,21 +116,25 @@ def process_to_parquet_and_upload_to_s3(
if table_name in existing_s3_files:
status["not_uploaded"].append(table_name)
else:
- parquet_file = df.to_parquet(
- f"{table_name}.parquet", engine="pyarrow"
- ) # or fastparquet
- client.upload_file(parquet_file, bucket, f"{table_name}.parquet")
+ parquet_buffer = io.BytesIO()
+
+ df.to_parquet(parquet_buffer, engine="pyarrow") # or engine="fastparquet"
+
+ parquet_buffer.seek(0)
+
+ client.upload_fileobj(parquet_buffer, bucket, f"{table_name}.parquet")
+
status["uploaded"].append(table_name)
- for table_name, df in mutable_df_dict.items():
- s3_key = datetime.strftime(
- datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet"
- )
- parquet_file = df.to_parquet(
- f"{table_name}.parquet", engine="pyarrow"
- ) # or fastparquet
- client.upload_file(parquet_file, bucket, s3_key)
- status["uploaded"].append(table_name)
+ # for table_name, df in mutable_df_dict.items():
+ # s3_key = datetime.strftime(
+ # datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.parquet"
+ # )
+ # parquet_file = df.to_parquet(
+ # f"{table_name}.parquet", engine="pyarrow"
+ # ) # or fastparquet
+ # client.upload_file(parquet_file, bucket, s3_key)
+ # status["uploaded"].append(table_name)
return status
@@ -182,20 +188,23 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")):
return table_dfs
+
+
def bucket_name(bucket_prefix, client=boto3.client("s3")):
- # response = client.list_buckets()
- # for bucket in response["Buckets"]:
- # if bucket_prefix in bucket["Name"]:
- # return bucket["Name"]
-
-
- response = client.list_buckets()
- bucket_filter = [
+
+ response = client.list_buckets()
+ bucket_filter = [
bucket["Name"]
for bucket in response["Buckets"]
if bucket_prefix in bucket["Name"]
- ]
- return bucket_filter[0]
+ ]
+ if not bucket_filter:
+ raise ValueError(f"No bucket found with prefix: {bucket_prefix}")
+
+ return bucket_filter[0]
+
+
+
def list_existing_s3_files(bucket_name, client=boto3.client("s3")):
git.ajschof.me — hosted by ajschofield — powered by cgit