aboutsummaryrefslogtreecommitdiffstats
path: root/src/transform_lambda
diff options
context:
space:
mode:
Diffstat (limited to 'src/transform_lambda')
-rw-r--r--src/transform_lambda/transform_lambda.py15
1 files changed, 9 insertions, 6 deletions
diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py
index c25ab39..8a2cae8 100644
--- a/src/transform_lambda/transform_lambda.py
+++ b/src/transform_lambda/transform_lambda.py
@@ -11,7 +11,6 @@ from pg8000.native import Connection, InterfaceError
from datetime import datetime
-
class DBConnectionException(Exception):
"""Wraps pg8000.native Error or DatabaseError."""
@@ -115,13 +114,16 @@ def process_to_parquet_and_upload_to_s3(
if table_name in existing_s3_files:
status["not_uploaded"].append(table_name)
else:
-<<<<<<< HEAD:src/transform_lambda/transform_lambda.py
+
+
+<< << << < HEAD: src/transform_lambda/transform_lambda.py
parquet_file = df.to_parquet(
f"{table_name}.parquet", engine="pyarrow"
) # or fastparquet
# changed parquet_file variable to the file name
- client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet")
-=======
+ client.upload_file(f"{table_name}.parquet",
+ bucket, f"{table_name}.parquet")
+== == == =
parquet_buffer = io.BytesIO()
# or engine="fastparquet"
@@ -129,9 +131,10 @@ def process_to_parquet_and_upload_to_s3(
parquet_buffer.seek(0)
- client.upload_fileobj(parquet_buffer, bucket, f"{table_name}.parquet")
+ client.upload_fileobj(parquet_buffer, bucket,
+ f"{table_name}.parquet")
->>>>>>> 3f24ec753902feecec4c17e2877e19853bde1bb2:src/transform_lambda.py
+>>>>>> > 3f24ec753902feecec4c17e2877e19853bde1bb2: src/transform_lambda.py
status["uploaded"].append(table_name)
for table_name, df in mutable_df_dict.items():
git.ajschof.me — hosted by ajschofield — powered by cgit