aboutsummaryrefslogtreecommitdiffstats
path: root/src/load_lambda.py
blob: a3fd996d4553c8ed86354909dfc1b6c39f5ab899 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import boto3
from botocore.exceptions import ClientError
import pandas as pd
import pyarrow.parquet as pq
from io import BytesIO
import logging


logger = logging.getLogger(__name__)

logging.basicConfig(
    format="{asctime} - {levelname} - {message}",
    style="{",
    datefmt="%Y-%m-%d %H:%M",
    level=logging.DEBUG,
)

logging.getLogger("botocore").setLevel(logging.WARNING)

# get transform bucket
def transform_bucket(client=None):
    if client is None:
        client = boto3.client("s3")
    response = client.list_buckets()
    transform_bucket_filter = [
        bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"]
    ]

    if not transform_bucket_filter:
        raise ValueError("No transform_bucket found")

    return transform_bucket_filter[0]

# list and then retrieve parquet files from S3 bucket
# convert parquet files into dataframes and return a list of dataframes  
def convert_parquet_files_to_dfs(bucket_name=None, client=None):
    try:
        if client is None:
            client = boto3.client("s3")
        if bucket_name is None:
            bucket_name = transform_bucket(client)
        files = client.list_objects_v2(Bucket=bucket_name)

        dfs = []
        if "Contents" in files:
            for file in files["Contents"]:
                file_key = file['Key']
                try:
                    file_obj = client.get_object(Bucket=bucket_name, Key=file_key)
                    parquet_file = pq.ParquetFile(BytesIO(file_obj['Body'].read()))
                    df = parquet_file.read().to_pandas()
                    dfs.append(df)
                except ClientError as e:
                    logger.error(f"Unable to retrieve S3 object {file_key}: {e}")
                except Exception as e:
                    logger.error(f"Unable to process file {file_key}: {e}")
        else:
            logger.error(f"No files found in {bucket_name}.")
            return []
    except ValueError as value_error:
        logger.error(f"Unable to list objects: {value_error}")
        raise
    except ClientError as client_error:
        logger.error(f"Unable to list objects: {client_error}")
        raise

    return dfs 
git.ajschof.me — hosted by ajschofield — powered by cgit