From 5be3b130170c82360ff9715f5c09b9e815fc16f4 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Mon, 19 Aug 2024 23:32:25 +0100 Subject: feat: use buffers for s3 upload instead of csv files --- src/extract_lambda.py | 50 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 7c6c3d1..f38e24a 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -3,6 +3,7 @@ import json import logging import re from datetime import datetime +from io import StringIO import boto3 from botocore.exceptions import ClientError @@ -139,6 +140,26 @@ def get_latest_timestamp(existing_files): return max(all_datetimes) if all_datetimes else datetime.min +def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key): + csv_buffer = StringIO() + csv_writer = csv.writer(csv_buffer) + + csv_writer.writerow(column_names) + + for row in rows: + csv_writer.writerow(row) + + if csv_buffer.tell() > 5 * 1024 * 1024: + csv_buffer.seek(0) + s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) + csv_buffer.truncate(0) + csv_buffer.seek(0) + + if csv_buffer.tell() > 0: + csv_buffer.seek(0) + s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) + + def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """Creates a list of the tables from a database query and then selects everything from each table in individual queries @@ -164,29 +185,24 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, ) - # Creating a temporary file path and writing the column name to it followed by each row of data if rows: - csv_file_path = f"/tmp/{table_name}.csv" - with open(csv_file_path, "w", newline="") as file: - writer = csv.writer(file) - # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] - column_names = [ - col_name[0] - for col_name in db.run( - """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS - WHERE table_name = :table ;""", - table=table_name, - ) - ] - writer.writerow(column_names) - writer.writerows(rows) + column_names = [ + col_name[0] + for col_name in db.run( + """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = :table ;""", + table=table_name, + ) + ] + s3_key = datetime.strftime( datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" ) - # Writing the new file to S3 extract bucket: try: - client.upload_file(csv_file_path, extract_bucket(), s3_key) + stream_to_s3( + table_name, rows, column_names, client, extract_bucket(), s3_key + ) load_status["updated"].append(table_name) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: -- cgit v1.2.3