aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorEllie <ecsymonds@gmail.com>2024-08-23 09:40:08 +0100
committerEllie <ecsymonds@gmail.com>2024-08-23 09:40:08 +0100
commit6bf831c5387408e92a63cb5667aab8f415b536e4 (patch)
tree90bf53d2d25d62c61edfbe3fe0adaadaa3c26218 /src
parenta5b4056961ae65b4b2b1fe3afaf1561b2ba749ae (diff)
downloadde-project-bentley-6bf831c5387408e92a63cb5667aab8f415b536e4.tar.gz
de-project-bentley-6bf831c5387408e92a63cb5667aab8f415b536e4.zip
add improved convert parquet files to df function
Diffstat (limited to 'src')
-rw-r--r--src/load_lambda.py33
1 files changed, 19 insertions, 14 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py
index 2f0c33a..1813db4 100644
--- a/src/load_lambda.py
+++ b/src/load_lambda.py
@@ -1,11 +1,8 @@
import boto3
from botocore.exceptions import ClientError
-from pg8000.native import Connection, InterfaceError, identifier
import pandas as pd
import pyarrow.parquet as pq
from io import BytesIO
-
-from botocore.exceptions import ClientError
import logging
@@ -19,7 +16,9 @@ logging.basicConfig(
)
logging.getLogger("botocore").setLevel(logging.WARNING)
-
+
+# list and then retrieve parquet files from S3 bucket
+# convert parquet files into dataframes and return a list of dataframes
def convert_parquet_files_to_dfs(bucket_name=None, client=None):
try:
if client is None:
@@ -29,20 +28,26 @@ def convert_parquet_files_to_dfs(bucket_name=None, client=None):
files = client.list_objects_v2(Bucket=bucket_name)
dfs = []
- for file in files:
- file_key = file['Key']
- try:
- file_obj = client.get_object(Bucket=bucket_name, Key=file_key)
- parquet_file = pq.ParquetFile(BytesIO(file_obj['body'].read()))
- df = parquet_file.read().to_pandas()
- dfs.append(df)
- except ClientError as e:
- logger.error(f"Unable to retrieve S3 object {file_key}: {e}")
+ if "Contents" in files:
+ for file in files["Contents"]:
+ file_key = file['Key']
+ try:
+ file_obj = client.get_object(Bucket=bucket_name, Key=file_key)
+ parquet_file = pq.ParquetFile(BytesIO(file_obj['Body'].read()))
+ df = parquet_file.read().to_pandas()
+ dfs.append(df)
+ except ClientError as e:
+ logger.error(f"Unable to retrieve S3 object {file_key}: {e}")
+ except Exception as e:
+ logger.error(f"Unable to process file {file_key}: {e}")
+ else:
+ logger.error(f"No files found in {bucket_name}.")
+ return []
except ValueError as value_error:
logger.error(f"Unable to list objects: {value_error}")
raise
except ClientError as client_error:
logger.error(f"Unable to list objects: {client_error}")
+ raise
return dfs
- \ No newline at end of file
git.ajschof.me — hosted by ajschofield — powered by cgit