From e25bee6c1c9db8edaf3197f0dc48fa3c63e61744 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:01:55 +0100 Subject: feat: revert s3 streaming to previous implementation for uploading --- src/extract_lambda.py | 56 +++++++++++++++++++++++---------------------------- 1 file changed, 25 insertions(+), 31 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 7efaac0..4921034 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -49,7 +49,7 @@ def lambda_handler(event, context): ), } except Exception as e: - logger.error(f"Error: {e}") + logger.error(f"Error: {e}", exc_info=True) return {"statusCode": 500, "body": json.dumps("Internal server error.")} finally: if db: @@ -78,7 +78,7 @@ def retrieve_secrets(): def connect_to_database() -> Connection: try: - secrets = retrieve_secrets() + secrets = json.loads(retrieve_secrets()) host = secrets["host"] port = secrets["port"] user = secrets["user"] @@ -141,20 +141,6 @@ def get_latest_timestamp(existing_files): return max(all_datetimes) if all_datetimes else datetime.min -def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key): - csv_buffer = StringIO() - csv_writer = csv.writer(csv_buffer) - - csv_writer.writerow(column_names) - - for row in rows: - csv_writer.writerow(row) - - csv_buffer.seek(0) - - s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) - - def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """Creates a list of the tables from a database query and then selects everything from each table in individual queries @@ -180,25 +166,29 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, ) + # Creating a temporary file path and writing the column name to it followed by each row of data if rows: - column_names = [ - col_name[0] - for col_name in db.run( - """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS - WHERE table_name = :table ;""", - table=table_name, - ) - ] - - s3_key = ( - f"{table_name}/{datetime.now().strftime('%Y/%m/%d')}/" - f"{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv" + csv_file_path = f"/tmp/{table_name}.csv" + with open(csv_file_path, "w", newline="") as file: + writer = csv.writer(file) + # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [ + col_name[0] + for col_name in db.run( + """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = :table ;""", + table=table_name, + ) + ] + writer.writerow(column_names) + writer.writerows(rows) + s3_key = datetime.strftime( + datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" ) + # Writing the new file to S3 extract bucket: try: - stream_to_s3( - table_name, rows, column_names, client, extract_bucket(), s3_key - ) + client.upload_file(csv_file_path, extract_bucket(), s3_key) load_status["updated"].append(table_name) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: @@ -207,3 +197,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): load_status["no change"].append(table_name) logger.info(f"No new data") return load_status + + +if __name__ == "__main__": + lambda_handler(None, None) -- cgit v1.2.3 From 5211751b69a894874945e3a916c33781a327ab10 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:26:26 +0100 Subject: feat: conditional logic for if bucket is empty --- src/extract_lambda.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 4921034..6216446 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -124,6 +124,7 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 logger.error(f"Error retrieving S3 object {s3_key}: {e}") else: logger.error("The bucket is empty") + return None except ClientError as e: logger.error(f"Error listing S3 objects: {e}") @@ -132,13 +133,18 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 def get_latest_timestamp(existing_files): - all_datetimes = [] - for file_name in existing_files.keys(): - match = re.search(r"\/(.+/).+_(.+)\.csv", file_name) - if match: - datetime_str = "".join(match.group(1, 2)) - all_datetimes.append(datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")) - return max(all_datetimes) if all_datetimes else datetime.min + if existing_files: + all_datetimes = [] + for file_name in existing_files.keys(): + match = re.search(r"\/(.+/).+_(.+)\.csv", file_name) + if match: + datetime_str = "".join(match.group(1, 2)) + all_datetimes.append( + datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S") + ) + return max(all_datetimes) if all_datetimes else datetime.min + + return existing_files def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): @@ -163,8 +169,16 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): for table in tables: table_name = table[0] rows = db.run( - f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", - latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, + f""" + SELECT * FROM {identifier(table_name)} + WHERE last_updated >= :latest; + """, + latest={ + datetime.strftime( + latest_timestamp if latest_timestamp else datetime(1990, 1, 1), + "%Y-%m-%d %H:%M:%S", + ) + }, ) # Creating a temporary file path and writing the column name to it followed by each row of data if rows: -- cgit v1.2.3 From dc3a7e74ddf549dad05745c64201aaf0d3402213 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:31:25 +0100 Subject: feat: add advanced logging --- src/extract_lambda.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 6216446..9daf662 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -10,8 +10,12 @@ from botocore.exceptions import ClientError from pg8000.native import Connection, InterfaceError, identifier logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) - +logging.basicConfig( + format="{asctime} - {levelname} - {message}", + style="{", + datefmt="%Y-%m-%d %H:%M", + level=logging.INFO, +) # DB Exception class @@ -168,11 +172,13 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): for table in tables: table_name = table[0] - rows = db.run( - f""" + base_query = f""" SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest; - """, + """ + logger.info(f"Processing table: {table_name}") + rows = db.run( + base_query, latest={ datetime.strftime( latest_timestamp if latest_timestamp else datetime(1990, 1, 1), -- cgit v1.2.3 From 35397e8bad42a8c507d1fb13007c6da2f947e851 Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:44:30 +0100 Subject: feat: add additional logging and exclude unnecessary table --- src/extract_lambda.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 9daf662..fe22192 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -165,7 +165,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """ SELECT table_name FROM information_schema.tables - WHERE table_schema='public' + WHERE table_schema='public' AND table_name != '_prisma_migrations' AND table_type='BASE TABLE'; """ ) @@ -176,16 +176,18 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest; """ - logger.info(f"Processing table: {table_name}") - rows = db.run( - base_query, - latest={ + latest = ( + { datetime.strftime( latest_timestamp if latest_timestamp else datetime(1990, 1, 1), "%Y-%m-%d %H:%M:%S", ) }, ) + logger.info(f"Processing table: {table_name}") + logger.info(f"Latest timestamp: {latest[0]}") + rows = db.run(base_query, latest=latest) + logger.info(f"Rows: {rows}") # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" -- cgit v1.2.3 From be911e22a964bdf7d5a4421cde7d7c6df447ed5c Mon Sep 17 00:00:00 2001 From: Alex Schofield Date: Tue, 20 Aug 2024 11:49:59 +0100 Subject: refactor: change rows output to debug logger output --- src/extract_lambda.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/extract_lambda.py') diff --git a/src/extract_lambda.py b/src/extract_lambda.py index fe22192..e9f438b 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -16,7 +16,6 @@ logging.basicConfig( datefmt="%Y-%m-%d %H:%M", level=logging.INFO, ) -# DB Exception class class DBConnectionException(Exception): @@ -187,7 +186,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): logger.info(f"Processing table: {table_name}") logger.info(f"Latest timestamp: {latest[0]}") rows = db.run(base_query, latest=latest) - logger.info(f"Rows: {rows}") + logger.debug(f"Rows: {rows}") # Creating a temporary file path and writing the column name to it followed by each row of data if rows: csv_file_path = f"/tmp/{table_name}.csv" -- cgit v1.2.3