aboutsummaryrefslogtreecommitdiffstats
path: root/src/load_lambda.py
blob: d95c27ab71e2b76e412414b871fe82be8e1732a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import boto3
from botocore.exceptions import ClientError
import pandas as pd
import pyarrow.parquet as pq
from io import BytesIO
import logging
import json
from src.extract_lambda import retrieve_secrets, connect_to_database
from sqlalchemy import create_engine


logger = logging.getLogger(__name__)

logging.basicConfig(
    format="{asctime} - {levelname} - {message}",
    style="{",
    datefmt="%Y-%m-%d %H:%M",
    level=logging.DEBUG,
)

logging.getLogger("botocore").setLevel(logging.WARNING)

def lambda_handler(event, context):
    db = None
    try:
        uploaded_tables = upload_dfs_to_database()
        if uploaded_tables == []:
            return {
                "statusCode": 200,
                "body": json.dumps("No datframes were uploaded."),
            }
        return {
            "statusCode": 200,
            "body": json.dumps(
                f"""The following dataframes were uploaded successfully: 
                {', '.join(upload_dfs_to_database['updated'])}."""
            ),
        }
    except Exception as e:
        logger.error(f"Error: {e}", exc_info=True)
        return {"statusCode": 500, "body": json.dumps("Internal server error.")}
    finally:
        if db:
            db.close()

# connect to database, slightly different way of doing it, to allow manipulation through pandas
def connect_to_db_and_return_engine():
    secrets = json.loads(retrieve_secrets("bentley-RDS-credentials"))  #need to amend retrieve secrets function
    host = secrets["host"]
    port = secrets["port"]
    user = secrets["user"]
    password = secrets["password"]
    database = secrets["database"]
    conn_str = f'postgresql+pg8000://{user}:{password}@{host}:{port}/{database}'
    engine = create_engine(conn_str) #interface between python (pandas) and SQL
    return engine



# get transform bucket
def transform_bucket(client=None):
    if client is None:
        client = boto3.client("s3")
    response = client.list_buckets()
    transform_bucket_filter = [
        bucket["Name"] for bucket in response["Buckets"] if "transform" in bucket["Name"]
    ]

    if not transform_bucket_filter:
        raise ValueError("No transform_bucket found")

    return transform_bucket_filter[0]

# list and then retrieve parquet files from S3 bucket
# convert parquet files into dataframes and return a list of dataframes  
def convert_parquet_files_to_dfs(bucket_name=None, client=None):
    try:
        if client is None:
            client = boto3.client("s3")
        if bucket_name is None:
            bucket_name = transform_bucket(client)
        files = client.list_objects_v2(Bucket=bucket_name)

        dfs = {}
        if "Contents" in files:
            for file in files["Contents"]:
                file_key = file['Key']
                try:
                    file_obj = client.get_object(Bucket=bucket_name, Key=file_key)
                    parquet_file = pq.ParquetFile(BytesIO(file_obj['Body'].read()))
                    df = parquet_file.read().to_pandas()
                    dfs[file_key] = df
                except ClientError as e:
                    logger.error(f"Unable to retrieve S3 object {file_key}: {e}")
                except Exception as e:
                    logger.error(f"Unable to process file {file_key}: {e}")
        else:
            logger.error(f"No files found in {bucket_name}.")
            return []
    except ValueError as value_error:
        logger.error(f"Unable to list objects: {value_error}")
        raise
    except ClientError as client_error:
        logger.error(f"Unable to list objects: {client_error}")
        raise

    return dfs

def upload_dfs_to_database():
    uploaded = []
    dict_of_dfs = convert_parquet_files_to_dfs()
    db_engine = connect_to_db_and_return_engine()
    try:
        for table_name, df in dict_of_dfs:
            df.to_sql(table_name, con=db_engine, ifexists="replace", index=False)
            uploaded.append(table_name)
    except Exception as e:
        logger.error(f"Error uploading dataframes: {e}")
    db_engine.dispose()
    return uploaded

    # aiming to return a list of uploaded tables
git.ajschof.me — hosted by ajschofield — powered by cgit