aboutsummaryrefslogtreecommitdiffstats
path: root/src/transform_lambda.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/transform_lambda.py')
-rw-r--r--src/transform_lambda.py40
1 files changed, 18 insertions, 22 deletions
diff --git a/src/transform_lambda.py b/src/transform_lambda.py
index 9830e0f..3b1e9e6 100644
--- a/src/transform_lambda.py
+++ b/src/transform_lambda.py
@@ -11,6 +11,7 @@ from pg8000.native import Connection, InterfaceError
from datetime import datetime
import io
+
class DBConnectionException(Exception):
"""Wraps pg8000.native Error or DatabaseError."""
@@ -108,7 +109,7 @@ def process_to_parquet_and_upload_to_s3(
immutable_df_dict,
mutable_df_dict,
bucket,
- client=boto3.client("s3")
+ client=boto3.client("s3"),
):
status = {"uploaded": [], "not_uploaded": []}
@@ -117,13 +118,14 @@ def process_to_parquet_and_upload_to_s3(
status["not_uploaded"].append(table_name)
else:
parquet_buffer = io.BytesIO()
-
- df.to_parquet(parquet_buffer, engine="pyarrow") # or engine="fastparquet"
-
+
+ # or engine="fastparquet"
+ df.to_parquet(parquet_buffer, engine="pyarrow")
+
parquet_buffer.seek(0)
-
+
client.upload_fileobj(parquet_buffer, bucket, f"{table_name}.parquet")
-
+
status["uploaded"].append(table_name)
# for table_name, df in mutable_df_dict.items():
@@ -188,23 +190,17 @@ def read_from_s3_subfolder_to_df(tables, bucket, client=boto3.client("s3")):
return table_dfs
-
-
def bucket_name(bucket_prefix, client=boto3.client("s3")):
-
- response = client.list_buckets()
- bucket_filter = [
- bucket["Name"]
- for bucket in response["Buckets"]
- if bucket_prefix in bucket["Name"]
- ]
- if not bucket_filter:
- raise ValueError(f"No bucket found with prefix: {bucket_prefix}")
-
- return bucket_filter[0]
-
-
-
+ response = client.list_buckets()
+ bucket_filter = [
+ bucket["Name"]
+ for bucket in response["Buckets"]
+ if bucket_prefix in bucket["Name"]
+ ]
+ if not bucket_filter:
+ raise ValueError(f"No bucket found with prefix: {bucket_prefix}")
+
+ return bucket_filter[0]
def list_existing_s3_files(bucket_name, client=boto3.client("s3")):
git.ajschof.me — hosted by ajschofield — powered by cgit