From e25bee6c1c9db8edaf3197f0dc48fa3c63e61744 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:01:55 +0100 Subject: feat: revert s3 streaming to previous implementation for uploading --- src/extract_lambda.py | 56 +++++++++++++++++++++++---------------------------- 1 file changed, 25 insertions(+), 31 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 7efaac0..4921034 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -49,7 +49,7 @@ def lambda_handler(event, context): ), } except Exception as e: - logger.error(f"Error: {e}") + logger.error(f"Error: {e}", exc_info=True) return {"statusCode": 500, "body": json.dumps("Internal server error.")} finally: if db: @@ -78,7 +78,7 @@ def retrieve_secrets(): def connect_to_database() -> Connection: try: - secrets = retrieve_secrets() + secrets = json.loads(retrieve_secrets()) host = secrets["host"] port = secrets["port"] user = secrets["user"] @@ -141,20 +141,6 @@ def get_latest_timestamp(existing_files): return max(all_datetimes) if all_datetimes else datetime.min -def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key): - csv_buffer = StringIO() - csv_writer = csv.writer(csv_buffer) - - csv_writer.writerow(column_names) - - for row in rows: - csv_writer.writerow(row) - - csv_buffer.seek(0) - - s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) - - def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """Creates a list of the tables from a database query and then selects everything from each table in individual queries @@ -180,25 +166,29 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, ) + # Creating a temporary file path and writing the column name to it followed by each row of data if rows: - column_names = [ - col_name[0] - for col_name in db.run( - """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS - WHERE table_name = :table ;""", - table=table_name, - ) - ] - - s3_key = ( - f"{table_name}/{datetime.now().strftime('%Y/%m/%d')}/" - f"{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv" + csv_file_path = f"/tmp/{table_name}.csv" + with open(csv_file_path, "w", newline="") as file: + writer = csv.writer(file) + # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [ + col_name[0] + for col_name in db.run( + """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = :table ;""", + table=table_name, + ) + ] + writer.writerow(column_names) + writer.writerows(rows) + s3_key = datetime.strftime( + datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" ) + # Writing the new file to S3 extract bucket: try: - stream_to_s3( - table_name, rows, column_names, client, extract_bucket(), s3_key - ) + client.upload_file(csv_file_path, extract_bucket(), s3_key) load_status["updated"].append(table_name) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: @@ -207,3 +197,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): load_status["no change"].append(table_name) logger.info(f"No new data") return load_status + + +if __name__ == "__main__": + lambda_handler(None, None) -- cgit v1.2.3