diff options
| author | Alex <git@ajschof.me> | 2024-08-15 19:51:49 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-08-15 19:51:49 +0100 |
| commit | 0358d491ace5859fe5140e439ec4f1fd80a67bb0 (patch) | |
| tree | a690741aac60485522a578b655e7b32418172f44 /src | |
| parent | 733cb2eae5ab77c71c24747b5652ce135bb4efe4 (diff) | |
| parent | 83a05489bd5ec56f51ede83f52f8babcaa7eef70 (diff) | |
| download | de-project-bentley-0358d491ace5859fe5140e439ec4f1fd80a67bb0.tar.gz de-project-bentley-0358d491ace5859fe5140e439ec4f1fd80a67bb0.zip | |
Merge branch 'development' into chore/gitignore
Diffstat (limited to 'src')
| -rw-r--r-- | src/extract_lambda.py | 166 | ||||
| -rw-r--r-- | src/secrets_manager.py | 48 |
2 files changed, 193 insertions, 21 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py index 7d56c66..fb2d7e8 100644 --- a/src/extract_lambda.py +++ b/src/extract_lambda.py @@ -1,32 +1,156 @@ -from pg8000.native import Connection, Error, DatabaseError, InterfaceError -from dotenv import load_dotenv -import os +from pg8000.native import Connection, DatabaseError, InterfaceError +from dotenv import dotenv_values +import boto3 +import csv +from botocore.exceptions import ClientError +import logging +import json +from datetime import datetime +import re -load_dotenv() -def extract(): +logger = logging.getLogger() +logger.setLevel(logging.INFO) -# temporary credentials for dev- will not have access when uploaded - database = os.getenv('database') - user = os.getenv('user') - password = os.getenv('password') - host = os.getenv('host') - port = os.getenv('port') +class DBConnectionException(Exception): + """Wraps pg8000.native Error or DatabaseError.""" + def __init__(self, e): + """Initialise with provided error message.""" + self.message = str(e) + super().__init__(self.message) +def lambda_handler(event, context): + """This lambda function connects to the Totesys database, lists the contents of the ingestion bucket, + and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them + it uses 3 helper functions to achieve these 3 functionalities + """ try: - db = Connection.run( - database=database, - user=user, - password=password, - host=host, - port=port + db = connect_to_database() + existing_files = list_existing_s3_files() + any_changes = process_and_upload_tables(db, existing_files) + + if not any_changes: + logger.info("No changes detected in the database.") + return { + 'statusCode': 200, + 'body': json.dumps('No changes detected, no CSV files were uploaded.') + } + else: + return { + 'statusCode': 200, + 'body': json.dumps('CSV files processed and uploaded successfully.') + } + + except Exception as e: + logger.error(f'Error: {e}') + return { + 'statusCode': 500, + 'body': json.dumps('Internal server error.') + } + + finally: + + if db: + db.close() + +def get_config(path: str = ".env") -> dict: + return dotenv_values(path) + + +def connect_to_database() -> Connection: + try: + config = get_config() + host = config["host"] + port = config["port"] + user = config["user"] + password = config["password"] + database = config["database"] + + return Connection( + database=database, + user=user, + password=password, + host=host, + port=port ) - except DatabaseError as e: - print(e) except InterfaceError as i: - print(i) + logger.error(f'Interface error: {i}') + raise DBConnectionException("Failed to connect to database") + + + +def list_existing_s3_files(bucket_name='extract_bucket', client=boto3.client('s3')): + """Creates a dictionary and populates it with the + results of listing the contents of the s3 bucket, then + returns the populated dictionary + """ + + existing_files = {} + + try: + response = client.list_objects_v2(Bucket='extract_bucket') + + if 'Contents' in response: + for obj in response['Contents']: + s3_key = obj['Key'] + try: + file_obj = client.get_object(Bucket=bucket_name, Key=s3_key) + file_content = file_obj['Body'].read().decode('utf-8') + existing_files[s3_key] = file_content + except ClientError as e: + logger.error(f'Error retrieving S3 object {s3_key}: {e}') + else: + logger.error('The bucket is empty') + + except ClientError as e: + logger.error(f'Error listing S3 objects: {e}') + + return existing_files + + + +def process_and_upload_tables(db, existing_files, client=boto3.client('s3')): + """Creates a list of the tables from a database query and + then selects everything from each table in individual queries + it then writes each table to CSV files and compares with the item + in the existing_files dictionary with the same name. If it finds any changes + to files, or new tables/files it uploads them to the s3 bucket + """ + ## NEW CODE + all_datetimes = [] + for file_names in existing_files.keys(): + datetime_str_on_s3 = ''.join(re.search(r'\/(.+/).+_(.+)\.csv',file_names).group(1,2)) + all_datetimes.append(datetime.strptime(datetime_str_on_s3, '%Y/%m/%d/%H:%M:%S')) + latest_timestamp = max(all_datetimes) + ## END OF NEW CODE + tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';") + print(tables) + for table in tables: + table_name = table[0] + rows = db.run(f"SELECT * FROM {table_name};") + -
\ No newline at end of file + csv_file_path = f"/tmp/{table_name}.csv" + with open(csv_file_path, "w", newline='') as file: + writer = csv.writer(file) + #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [col_name[0] for col_name in db.run(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';")] + writer.writerow(column_names) + writer.writerows(rows) + s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') + new_csv_content = open(csv_file_path, "r").read() + ## NEW CODE + latest_s3_object_key = datetime.strftime(latest_timestamp,f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv') + ## END OF NEW CODE + if existing_files[latest_s3_object_key] != new_csv_content: + try: + client.upload_file(csv_file_path, 'extract_bucket', s3_key) + logger.info(f"Uploaded {s3_key} to S3.") + except ClientError as e: + logger.error(f'Error uploading to S3: {e}') + else: + logger.info(f"No new data.") +
\ No newline at end of file diff --git a/src/secrets_manager.py b/src/secrets_manager.py new file mode 100644 index 0000000..c0fb61e --- /dev/null +++ b/src/secrets_manager.py @@ -0,0 +1,48 @@ +import boto3 +from botocore.exceptions import ClientError +import json + + +def sm_client(): + sm_client = boto3.client('secretsmanager') + yield sm_client + +def create_secret(sm_client, secret_name, cohort_id, user, password, host, database, port): + secret = { + "cohort_id": cohort_id, + "user": user, + "password": password, + "host": host, + "database": database, + "port": port + } + + response = sm_client.create_secret( + Name = secret_name, + SecretString = json.dumps(secret) + ) + + print(response) + return response + +def list_secret(sm_client): + response = sm_client.list_secrets() + secret_dict = response['SecretList'] + secret_names = [] + for items in secret_dict: + secret_names.append(items['Name']) + print(f'{len(secret_names)} secret(s) available') + for name in secret_names: + print(name) + return secret_names + +def retrieve_secrets(sm_client): + response = sm_client.get_secrets( + + ) + + + +#retrieve secret +#so lambda can access totesy db +#so lambda connect to the db and then retrieve the data
\ No newline at end of file |
