aboutsummaryrefslogtreecommitdiffstats
path: root/src/transform_lambda
diff options
context:
space:
mode:
Diffstat (limited to 'src/transform_lambda')
-rw-r--r--src/transform_lambda/transform_lambda.py18
1 files changed, 18 insertions, 0 deletions
diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py
index 93b2284..c25ab39 100644
--- a/src/transform_lambda/transform_lambda.py
+++ b/src/transform_lambda/transform_lambda.py
@@ -11,6 +11,7 @@ from pg8000.native import Connection, InterfaceError
from datetime import datetime
+
class DBConnectionException(Exception):
"""Wraps pg8000.native Error or DatabaseError."""
@@ -114,11 +115,23 @@ def process_to_parquet_and_upload_to_s3(
if table_name in existing_s3_files:
status["not_uploaded"].append(table_name)
else:
+<<<<<<< HEAD:src/transform_lambda/transform_lambda.py
parquet_file = df.to_parquet(
f"{table_name}.parquet", engine="pyarrow"
) # or fastparquet
# changed parquet_file variable to the file name
client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet")
+=======
+ parquet_buffer = io.BytesIO()
+
+ # or engine="fastparquet"
+ df.to_parquet(parquet_buffer, engine="pyarrow")
+
+ parquet_buffer.seek(0)
+
+ client.upload_fileobj(parquet_buffer, bucket, f"{table_name}.parquet")
+
+>>>>>>> 3f24ec753902feecec4c17e2877e19853bde1bb2:src/transform_lambda.py
status["uploaded"].append(table_name)
for table_name, df in mutable_df_dict.items():
@@ -190,6 +203,11 @@ def bucket_name(bucket_prefix, client=boto3.client("s3")):
for bucket in response["Buckets"]
if bucket_prefix in bucket["Name"]
]
+<<<<<<< HEAD:src/transform_lambda/transform_lambda.py
+=======
+ if not bucket_filter:
+ raise ValueError(f"No bucket found with prefix: {bucket_prefix}")
+>>>>>>> 3f24ec753902feecec4c17e2877e19853bde1bb2:src/transform_lambda.py
return bucket_filter[0]
git.ajschof.me — hosted by ajschofield — powered by cgit