diff options
| author | Alex <git@ajschof.me> | 2024-08-15 19:47:50 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-08-15 19:47:50 +0100 |
| commit | cf23d5d0a217ba91b50aebc0261ae6fa064bfcd4 (patch) | |
| tree | 88050b6810dc096b2096650432b6e32b631b157d /src/extract_lambda.py | |
| parent | 7642266611b370b6e945e132c8e7b26c8d6fe9f3 (diff) | |
| parent | 486fb62af5568a70e22ded622072883758e9ffdf (diff) | |
| download | de-project-bentley-cf23d5d0a217ba91b50aebc0261ae6fa064bfcd4.tar.gz de-project-bentley-cf23d5d0a217ba91b50aebc0261ae6fa064bfcd4.zip | |
Merge pull request #40 from ajschofield/feature/extract_lambda_testing
pr: tests, lambda code & tf changes
Diffstat (limited to 'src/extract_lambda.py')
| -rw-r--r-- | src/extract_lambda.py | 30 |
1 files changed, 21 insertions, 9 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 56b47a6..fb2d7e8 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -6,6 +6,7 @@ from botocore.exceptions import ClientError import logging import json from datetime import datetime +import re logger = logging.getLogger() @@ -117,9 +118,16 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')): in the existing_files dictionary with the same name. If it finds any changes to files, or new tables/files it uploads them to the s3 bucket """ - + ## NEW CODE + all_datetimes = [] + for file_names in existing_files.keys(): + datetime_str_on_s3 = ''.join(re.search(r'\/(.+/).+_(.+)\.csv',file_names).group(1,2)) + all_datetimes.append(datetime.strptime(datetime_str_on_s3, '%Y/%m/%d/%H:%M:%S')) + latest_timestamp = max(all_datetimes) + ## END OF NEW CODE + tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';") - + print(tables) for table in tables: table_name = table[0] rows = db.run(f"SELECT * FROM {table_name};") @@ -128,17 +136,21 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')): csv_file_path = f"/tmp/{table_name}.csv" with open(csv_file_path, "w", newline='') as file: writer = csv.writer(file) - column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [col_name[0] for col_name in db.run(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';")] writer.writerow(column_names) writer.writerows(rows) - - s3_key = f"{table_name}/{datetime.today().year}/{datetime.today().month}/{datetime.today().day}/{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv" + s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') new_csv_content = open(csv_file_path, "r").read() - - - if s3_key not in existing_files or existing_files[s3_key] != new_csv_content: + ## NEW CODE + latest_s3_object_key = datetime.strftime(latest_timestamp,f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') + ## END OF NEW CODE + if existing_files[latest_s3_object_key] != new_csv_content: try: client.upload_file(csv_file_path, 'extract_bucket', s3_key) logger.info(f"Uploaded {s3_key} to S3.") except ClientError as e: - logger.error(f'Error uploading to S3: {e}')
\ No newline at end of file + logger.error(f'Error uploading to S3: {e}') + else: + logger.info(f"No new data.") +
\ No newline at end of file |
