aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAng Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local>2024-08-16 14:20:39 +0100
committerAng Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local>2024-08-16 15:33:02 +0100
commitaba65e0db08625c1ef0d3db6076b54e56e0b45ea (patch)
tree8e0f82fcc28a90e84d4573146f9e982d0195c52f
parentc284df39ed7735d736f4fe0f2571ba846b8f6315 (diff)
downloadde-project-bentley-aba65e0db08625c1ef0d3db6076b54e56e0b45ea.tar.gz
de-project-bentley-aba65e0db08625c1ef0d3db6076b54e56e0b45ea.zip
refactor following github actions major risk message
-rw-r--r--src/extract_lambda.py65
1 files changed, 31 insertions, 34 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py
index 323d04a..cc09e87 100644
--- a/src/extract_lambda.py
+++ b/src/extract_lambda.py
@@ -30,8 +30,8 @@ def lambda_handler(event, context):
db = connect_to_database()
existing_files = list_existing_s3_files()
any_changes = process_and_upload_tables(db, existing_files)
-
- if not any_changes:
+
+ if not any_changes['updated']:
logger.info("No changes detected in the database.")
return {
"statusCode": 200,
@@ -39,8 +39,9 @@ def lambda_handler(event, context):
}
else:
return {
- "statusCode": 200,
- "body": json.dumps("CSV files processed and uploaded successfully."),
+ 'statusCode': 200,
+ 'body': json.dumps(f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{
+ 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}""")
}
except Exception as e:
logger.error(f"Error: {e}")
@@ -124,7 +125,8 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
in the existing_files dictionary with the same name. If it finds any changes
to files, or new tables/files it uploads them to the s3 bucket
"""
- # NEW CODE
+ load_status = {'updated':[],'no change':[]}
+ ## Retrieving the latest file timestamp from S3 extract bucket
all_datetimes = []
for file_names in existing_files.keys():
datetime_str_on_s3 = "".join(
@@ -132,39 +134,34 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
)
all_datetimes.append(datetime.strptime(datetime_str_on_s3, "%Y/%m/%d/%H:%M:%S"))
latest_timestamp = max(all_datetimes)
- # END OF NEW CODE
- tables = db.run(
- "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';"
- )
- print(tables)
+ ## Iterating through tables on the database and retrieving only latest changes vs previous file load
+ tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';")
for table in tables:
table_name = table[0]
rows = db.run(
f"SELECT * FROM {table_name} WHERE last_updated >= {datetime.strftime(latest_timestamp,'%H-%m-%d %H:%M:%S')};"
)
- if rows:
- csv_file_path = f"/tmp/{table_name}.csv"
- with open(csv_file_path, "w", newline="") as file:
- writer = csv.writer(file)
- # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
- column_names = [
- col_name[0]
- for col_name in db.run(
- f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';"
- )
- ]
- writer.writerow(column_names)
- writer.writerows(rows)
- s3_key = datetime.strftime(
- datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv"
- )
-
- try:
- client.upload_file(csv_file_path, extract_bucket(), s3_key)
- logger.info(f"Uploaded {s3_key} to S3.")
- except ClientError as e:
- logger.error(f"Error uploading to S3: {e}")
- else:
- logger.info(f"No new data.")
+ ## Creating a temporary file path and writing the column name to it followed by each row of data
+ if rows:
+ csv_file_path = f"/tmp/{table_name}.csv"
+ with open(csv_file_path, "w", newline='') as file:
+ writer = csv.writer(file)
+ #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
+ column_names = [col_name[0] for col_name in db.run(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';")]
+ writer.writerow(column_names)
+ writer.writerows(rows)
+ s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv')
+
+ ## Writing the new file to S3 extract bucket:
+ try:
+ client.upload_file(csv_file_path, extract_bucket(), s3_key)
+ load_status['updated'].append(table_name)
+ logger.info(f"Uploaded {s3_key} to S3.")
+ except ClientError as e:
+ logger.error(f'Error uploading to S3: {e}')
+ else:
+ load_status['no change'].append(table_name)
+ logger.info(f"No new data in {table_name} name. Latest data retrieved is from {latest_timestamp}.")
+ return load_status
git.ajschof.me — hosted by ajschofield — powered by cgit