aboutsummaryrefslogtreecommitdiffstats
path: root/src/extract_lambda.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/extract_lambda.py')
-rw-r--r--src/extract_lambda.py30
1 files changed, 21 insertions, 9 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py
index 56b47a6..fb2d7e8 100644
--- a/src/extract_lambda.py
+++ b/src/extract_lambda.py
@@ -6,6 +6,7 @@ from botocore.exceptions import ClientError
import logging
import json
from datetime import datetime
+import re
logger = logging.getLogger()
@@ -117,9 +118,16 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')):
in the existing_files dictionary with the same name. If it finds any changes
to files, or new tables/files it uploads them to the s3 bucket
"""
-
+ ## NEW CODE
+ all_datetimes = []
+ for file_names in existing_files.keys():
+ datetime_str_on_s3 = ''.join(re.search(r'\/(.+/).+_(.+)\.csv',file_names).group(1,2))
+ all_datetimes.append(datetime.strptime(datetime_str_on_s3, '%Y/%m/%d/%H:%M:%S'))
+ latest_timestamp = max(all_datetimes)
+ ## END OF NEW CODE
+
tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';")
-
+ print(tables)
for table in tables:
table_name = table[0]
rows = db.run(f"SELECT * FROM {table_name};")
@@ -128,17 +136,21 @@ def process_and_upload_tables(db, existing_files, client=boto3.client('s3')):
csv_file_path = f"/tmp/{table_name}.csv"
with open(csv_file_path, "w", newline='') as file:
writer = csv.writer(file)
- column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
+ #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
+ column_names = [col_name[0] for col_name in db.run(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';")]
writer.writerow(column_names)
writer.writerows(rows)
-
- s3_key = f"{table_name}/{datetime.today().year}/{datetime.today().month}/{datetime.today().day}/{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv"
+ s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv')
new_csv_content = open(csv_file_path, "r").read()
-
-
- if s3_key not in existing_files or existing_files[s3_key] != new_csv_content:
+ ## NEW CODE
+ latest_s3_object_key = datetime.strftime(latest_timestamp,f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv')
+ ## END OF NEW CODE
+ if existing_files[latest_s3_object_key] != new_csv_content:
try:
client.upload_file(csv_file_path, 'extract_bucket', s3_key)
logger.info(f"Uploaded {s3_key} to S3.")
except ClientError as e:
- logger.error(f'Error uploading to S3: {e}') \ No newline at end of file
+ logger.error(f'Error uploading to S3: {e}')
+ else:
+ logger.info(f"No new data.")
+ \ No newline at end of file
git.ajschof.me — hosted by ajschofield — powered by cgit