aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/extract_lambda.py56
1 files changed, 25 insertions, 31 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py
index 7efaac0..4921034 100644
--- a/src/extract_lambda.py
+++ b/src/extract_lambda.py
@@ -49,7 +49,7 @@ def lambda_handler(event, context):
),
}
except Exception as e:
- logger.error(f"Error: {e}")
+ logger.error(f"Error: {e}", exc_info=True)
return {"statusCode": 500, "body": json.dumps("Internal server error.")}
finally:
if db:
@@ -78,7 +78,7 @@ def retrieve_secrets():
def connect_to_database() -> Connection:
try:
- secrets = retrieve_secrets()
+ secrets = json.loads(retrieve_secrets())
host = secrets["host"]
port = secrets["port"]
user = secrets["user"]
@@ -141,20 +141,6 @@ def get_latest_timestamp(existing_files):
return max(all_datetimes) if all_datetimes else datetime.min
-def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key):
- csv_buffer = StringIO()
- csv_writer = csv.writer(csv_buffer)
-
- csv_writer.writerow(column_names)
-
- for row in rows:
- csv_writer.writerow(row)
-
- csv_buffer.seek(0)
-
- s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key)
-
-
def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
"""Creates a list of the tables from a database query and
then selects everything from each table in individual queries
@@ -180,25 +166,29 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;",
latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")},
)
+ # Creating a temporary file path and writing the column name to it followed by each row of data
if rows:
- column_names = [
- col_name[0]
- for col_name in db.run(
- """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS
- WHERE table_name = :table ;""",
- table=table_name,
- )
- ]
-
- s3_key = (
- f"{table_name}/{datetime.now().strftime('%Y/%m/%d')}/"
- f"{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv"
+ csv_file_path = f"/tmp/{table_name}.csv"
+ with open(csv_file_path, "w", newline="") as file:
+ writer = csv.writer(file)
+ # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
+ column_names = [
+ col_name[0]
+ for col_name in db.run(
+ """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS
+ WHERE table_name = :table ;""",
+ table=table_name,
+ )
+ ]
+ writer.writerow(column_names)
+ writer.writerows(rows)
+ s3_key = datetime.strftime(
+ datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv"
)
+ # Writing the new file to S3 extract bucket:
try:
- stream_to_s3(
- table_name, rows, column_names, client, extract_bucket(), s3_key
- )
+ client.upload_file(csv_file_path, extract_bucket(), s3_key)
load_status["updated"].append(table_name)
logger.info(f"Uploaded {s3_key} to S3.")
except ClientError as e:
@@ -207,3 +197,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
load_status["no change"].append(table_name)
logger.info(f"No new data")
return load_status
+
+
+if __name__ == "__main__":
+ lambda_handler(None, None)
git.ajschof.me — hosted by ajschofield — powered by cgit