diff options
| author | Ang Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local> | 2024-08-16 14:20:39 +0100 |
|---|---|---|
| committer | Ang Bel <anzelikabelotelova@Anzelikas-MacBook-Air.local> | 2024-08-16 15:33:02 +0100 |
| commit | aba65e0db08625c1ef0d3db6076b54e56e0b45ea (patch) | |
| tree | 8e0f82fcc28a90e84d4573146f9e982d0195c52f /src/extract_lambda.py | |
| parent | c284df39ed7735d736f4fe0f2571ba846b8f6315 (diff) | |
| download | de-project-bentley-aba65e0db08625c1ef0d3db6076b54e56e0b45ea.tar.gz de-project-bentley-aba65e0db08625c1ef0d3db6076b54e56e0b45ea.zip | |
refactor following github actions major risk message
Diffstat (limited to 'src/extract_lambda.py')
| -rw-r--r-- | src/extract_lambda.py | 65 |
1 files changed, 31 insertions, 34 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 323d04a..cc09e87 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -30,8 +30,8 @@ def lambda_handler(event, context): db = connect_to_database() existing_files = list_existing_s3_files() any_changes = process_and_upload_tables(db, existing_files) - - if not any_changes: + + if not any_changes['updated']: logger.info("No changes detected in the database.") return { "statusCode": 200, @@ -39,8 +39,9 @@ def lambda_handler(event, context): } else: return { - "statusCode": 200, - "body": json.dumps("CSV files processed and uploaded successfully."), + 'statusCode': 200, + 'body': json.dumps(f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{ + 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}""") } except Exception as e: logger.error(f"Error: {e}") @@ -124,7 +125,8 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): in the existing_files dictionary with the same name. If it finds any changes to files, or new tables/files it uploads them to the s3 bucket """ - # NEW CODE + load_status = {'updated':[],'no change':[]} + ## Retrieving the latest file timestamp from S3 extract bucket all_datetimes = [] for file_names in existing_files.keys(): datetime_str_on_s3 = "".join( @@ -132,39 +134,34 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): ) all_datetimes.append(datetime.strptime(datetime_str_on_s3, "%Y/%m/%d/%H:%M:%S")) latest_timestamp = max(all_datetimes) - # END OF NEW CODE - tables = db.run( - "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';" - ) - print(tables) + ## Iterating through tables on the database and retrieving only latest changes vs previous file load + tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';") for table in tables: table_name = table[0] rows = db.run( f"SELECT * FROM {table_name} WHERE last_updated >= {datetime.strftime(latest_timestamp,'%H-%m-%d %H:%M:%S')};" ) - if rows: - csv_file_path = f"/tmp/{table_name}.csv" - with open(csv_file_path, "w", newline="") as file: - writer = csv.writer(file) - # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] - column_names = [ - col_name[0] - for col_name in db.run( - f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';" - ) - ] - writer.writerow(column_names) - writer.writerows(rows) - s3_key = datetime.strftime( - datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" - ) - - try: - client.upload_file(csv_file_path, extract_bucket(), s3_key) - logger.info(f"Uploaded {s3_key} to S3.") - except ClientError as e: - logger.error(f"Error uploading to S3: {e}") - else: - logger.info(f"No new data.") + ## Creating a temporary file path and writing the column name to it followed by each row of data + if rows: + csv_file_path = f"/tmp/{table_name}.csv" + with open(csv_file_path, "w", newline='') as file: + writer = csv.writer(file) + #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [col_name[0] for col_name in db.run(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';")] + writer.writerow(column_names) + writer.writerows(rows) + s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') + + ## Writing the new file to S3 extract bucket: + try: + client.upload_file(csv_file_path, extract_bucket(), s3_key) + load_status['updated'].append(table_name) + logger.info(f"Uploaded {s3_key} to S3.") + except ClientError as e: + logger.error(f'Error uploading to S3: {e}') + else: + load_status['no change'].append(table_name) + logger.info(f"No new data in {table_name} name. Latest data retrieved is from {latest_timestamp}.") + return load_status |
