diff options
| author | Ellie <ecsymonds@gmail.com> | 2024-08-22 16:55:48 +0100 |
|---|---|---|
| committer | Ellie <ecsymonds@gmail.com> | 2024-08-22 16:55:48 +0100 |
| commit | 67de54d70ee918bbaf537cb2c119990c4a70c9a7 (patch) | |
| tree | e08b788dbb6fdaa5479b7dfae9bd7e305b1028c3 /src | |
| parent | 032760a745353b0584bc635bd5c51aa928677fea (diff) | |
| download | de-project-bentley-67de54d70ee918bbaf537cb2c119990c4a70c9a7.tar.gz de-project-bentley-67de54d70ee918bbaf537cb2c119990c4a70c9a7.zip | |
add convert parquet to df function
Diffstat (limited to 'src')
| -rw-r--r-- | src/load_lambda.py | 50 |
1 files changed, 48 insertions, 2 deletions
diff --git a/src/load_lambda.py b/src/load_lambda.py index c6a8e60..2f0c33a 100644 --- a/src/load_lambda.py +++ b/src/load_lambda.py @@ -1,2 +1,48 @@ -def lambda_handler(): - pass +import boto3 +from botocore.exceptions import ClientError +from pg8000.native import Connection, InterfaceError, identifier +import pandas as pd +import pyarrow.parquet as pq +from io import BytesIO + +from botocore.exceptions import ClientError +import logging + + +logger = logging.getLogger(__name__) + +logging.basicConfig( + format="{asctime} - {levelname} - {message}", + style="{", + datefmt="%Y-%m-%d %H:%M", + level=logging.DEBUG, +) + +logging.getLogger("botocore").setLevel(logging.WARNING) + +def convert_parquet_files_to_dfs(bucket_name=None, client=None): + try: + if client is None: + client = boto3.client("s3") + if bucket_name is None: + bucket_name = "transform_bucket" + files = client.list_objects_v2(Bucket=bucket_name) + + dfs = [] + for file in files: + file_key = file['Key'] + try: + file_obj = client.get_object(Bucket=bucket_name, Key=file_key) + parquet_file = pq.ParquetFile(BytesIO(file_obj['body'].read())) + df = parquet_file.read().to_pandas() + dfs.append(df) + except ClientError as e: + logger.error(f"Unable to retrieve S3 object {file_key}: {e}") + except ValueError as value_error: + logger.error(f"Unable to list objects: {value_error}") + raise + except ClientError as client_error: + logger.error(f"Unable to list objects: {client_error}") + + return dfs +
\ No newline at end of file |
