diff options
| author | Alex <git@ajschof.me> | 2024-08-20 12:16:00 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-08-20 12:16:00 +0100 |
| commit | 983430f661bd89a406693d48b464b3120604f2dd (patch) | |
| tree | fe9ac32321c17d2e98d786efaa8df309e8c6d0d4 /src | |
| parent | 7f01f8a66d57d389c58c6e09de7ed0c914e298ce (diff) | |
| parent | 346aadfbf2208a0660ffc09959a91fc2f7b48c79 (diff) | |
| download | de-project-bentley-983430f661bd89a406693d48b464b3120604f2dd.tar.gz de-project-bentley-983430f661bd89a406693d48b464b3120604f2dd.zip | |
Merge pull request #73 from ajschofield/alex/tf-s3-perms
pr: terraform changes & refactoring/adding to extract_lambda
Diffstat (limited to 'src')
| -rw-r--r-- | src/extract_lambda.py | 101 |
1 files changed, 58 insertions, 43 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 9de6214..e9f438b 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -10,9 +10,12 @@ from botocore.exceptions import ClientError from pg8000.native import Connection, InterfaceError, identifier logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) - -# DB Exception class +logging.basicConfig( + format="{asctime} - {levelname} - {message}", + style="{", + datefmt="%Y-%m-%d %H:%M", + level=logging.INFO, +) class DBConnectionException(Exception): @@ -49,7 +52,7 @@ def lambda_handler(event, context): ), } except Exception as e: - logger.error(f"Error: {e}") + logger.error(f"Error: {e}", exc_info=True) return {"statusCode": 500, "body": json.dumps("Internal server error.")} finally: if db: @@ -124,6 +127,7 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 logger.error(f"Error retrieving S3 object {s3_key}: {e}") else: logger.error("The bucket is empty") + return None except ClientError as e: logger.error(f"Error listing S3 objects: {e}") @@ -132,27 +136,18 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3 def get_latest_timestamp(existing_files): - all_datetimes = [] - for file_name in existing_files.keys(): - match = re.search(r"\/(.+/).+_(.+)\.csv", file_name) - if match: - datetime_str = "".join(match.group(1, 2)) - all_datetimes.append(datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")) - return max(all_datetimes) if all_datetimes else datetime.min - - -def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key): - csv_buffer = StringIO() - csv_writer = csv.writer(csv_buffer) - - csv_writer.writerow(column_names) - - for row in rows: - csv_writer.writerow(row) - - csv_buffer.seek(0) + if existing_files: + all_datetimes = [] + for file_name in existing_files.keys(): + match = re.search(r"\/(.+/).+_(.+)\.csv", file_name) + if match: + datetime_str = "".join(match.group(1, 2)) + all_datetimes.append( + datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S") + ) + return max(all_datetimes) if all_datetimes else datetime.min - s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key) + return existing_files def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): @@ -169,36 +164,52 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): """ SELECT table_name FROM information_schema.tables - WHERE table_schema='public' + WHERE table_schema='public' AND table_name != '_prisma_migrations' AND table_type='BASE TABLE'; """ ) for table in tables: table_name = table[0] - rows = db.run( - f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;", - latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")}, + base_query = f""" + SELECT * FROM {identifier(table_name)} + WHERE last_updated >= :latest; + """ + latest = ( + { + datetime.strftime( + latest_timestamp if latest_timestamp else datetime(1990, 1, 1), + "%Y-%m-%d %H:%M:%S", + ) + }, ) + logger.info(f"Processing table: {table_name}") + logger.info(f"Latest timestamp: {latest[0]}") + rows = db.run(base_query, latest=latest) + logger.debug(f"Rows: {rows}") + # Creating a temporary file path and writing the column name to it followed by each row of data if rows: - column_names = [ - col_name[0] - for col_name in db.run( - """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS - WHERE table_name = :table ;""", - table=table_name, - ) - ] - - s3_key = ( - f"{table_name}/{datetime.now().strftime('%Y/%m/%d')}/" - f"{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv" + csv_file_path = f"/tmp/{table_name}.csv" + with open(csv_file_path, "w", newline="") as file: + writer = csv.writer(file) + # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [ + col_name[0] + for col_name in db.run( + """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = :table ;""", + table=table_name, + ) + ] + writer.writerow(column_names) + writer.writerows(rows) + s3_key = datetime.strftime( + datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" ) + # Writing the new file to S3 extract bucket: try: - stream_to_s3( - table_name, rows, column_names, client, extract_bucket(), s3_key - ) + client.upload_file(csv_file_path, extract_bucket(), s3_key) load_status["updated"].append(table_name) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: @@ -207,3 +218,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): load_status["no change"].append(table_name) logger.info(f"No new data") return load_status + + +if __name__ == "__main__": + lambda_handler(None, None) |
