diff options
| author | lian-manonog <lian.manonog@gmail.com> | 2024-08-28 11:37:09 +0100 |
|---|---|---|
| committer | lian-manonog <lian.manonog@gmail.com> | 2024-08-28 11:37:09 +0100 |
| commit | 4651e2f951a4c6c2605ded7abeb90197e50d61c6 (patch) | |
| tree | 438f69e0aa4006937eb014dd38cde6c44481c99f /src/transform_lambda/transform_lambda.py | |
| parent | 459702f2bdd3070923187ec0d4c76c85dbe0d235 (diff) | |
| parent | 3f24ec753902feecec4c17e2877e19853bde1bb2 (diff) | |
| download | de-project-bentley-4651e2f951a4c6c2605ded7abeb90197e50d61c6.tar.gz de-project-bentley-4651e2f951a4c6c2605ded7abeb90197e50d61c6.zip | |
Merge branch 'test/transform-lambda' of https://github.com/ajschofield/de-project-bentley into test/transform-lambda
Diffstat (limited to 'src/transform_lambda/transform_lambda.py')
| -rw-r--r-- | src/transform_lambda/transform_lambda.py | 18 |
1 files changed, 18 insertions, 0 deletions
diff --git a/src/transform_lambda/transform_lambda.py b/src/transform_lambda/transform_lambda.py index 93b2284..c25ab39 100644 --- a/src/transform_lambda/transform_lambda.py +++ b/src/transform_lambda/transform_lambda.py @@ -11,6 +11,7 @@ from pg8000.native import Connection, InterfaceError from datetime import datetime + class DBConnectionException(Exception): """Wraps pg8000.native Error or DatabaseError.""" @@ -114,11 +115,23 @@ def process_to_parquet_and_upload_to_s3( if table_name in existing_s3_files: status["not_uploaded"].append(table_name) else: +<<<<<<< HEAD:src/transform_lambda/transform_lambda.py parquet_file = df.to_parquet( f"{table_name}.parquet", engine="pyarrow" ) # or fastparquet # changed parquet_file variable to the file name client.upload_file(f"{table_name}.parquet", bucket, f"{table_name}.parquet") +======= + parquet_buffer = io.BytesIO() + + # or engine="fastparquet" + df.to_parquet(parquet_buffer, engine="pyarrow") + + parquet_buffer.seek(0) + + client.upload_fileobj(parquet_buffer, bucket, f"{table_name}.parquet") + +>>>>>>> 3f24ec753902feecec4c17e2877e19853bde1bb2:src/transform_lambda.py status["uploaded"].append(table_name) for table_name, df in mutable_df_dict.items(): @@ -190,6 +203,11 @@ def bucket_name(bucket_prefix, client=boto3.client("s3")): for bucket in response["Buckets"] if bucket_prefix in bucket["Name"] ] +<<<<<<< HEAD:src/transform_lambda/transform_lambda.py +======= + if not bucket_filter: + raise ValueError(f"No bucket found with prefix: {bucket_prefix}") +>>>>>>> 3f24ec753902feecec4c17e2877e19853bde1bb2:src/transform_lambda.py return bucket_filter[0] |
