aboutsummaryrefslogtreecommitdiffstats
path: root/src/extract_lambda.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/extract_lambda.py')
-rw-r--r--src/extract_lambda.py50
1 files changed, 33 insertions, 17 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py
index 7c6c3d1..f38e24a 100644
--- a/src/extract_lambda.py
+++ b/src/extract_lambda.py
@@ -3,6 +3,7 @@ import json
import logging
import re
from datetime import datetime
+from io import StringIO
import boto3
from botocore.exceptions import ClientError
@@ -139,6 +140,26 @@ def get_latest_timestamp(existing_files):
return max(all_datetimes) if all_datetimes else datetime.min
+def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key):
+ csv_buffer = StringIO()
+ csv_writer = csv.writer(csv_buffer)
+
+ csv_writer.writerow(column_names)
+
+ for row in rows:
+ csv_writer.writerow(row)
+
+ if csv_buffer.tell() > 5 * 1024 * 1024:
+ csv_buffer.seek(0)
+ s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key)
+ csv_buffer.truncate(0)
+ csv_buffer.seek(0)
+
+ if csv_buffer.tell() > 0:
+ csv_buffer.seek(0)
+ s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key)
+
+
def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
"""Creates a list of the tables from a database query and
then selects everything from each table in individual queries
@@ -164,29 +185,24 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;",
latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")},
)
- # Creating a temporary file path and writing the column name to it followed by each row of data
if rows:
- csv_file_path = f"/tmp/{table_name}.csv"
- with open(csv_file_path, "w", newline="") as file:
- writer = csv.writer(file)
- # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
- column_names = [
- col_name[0]
- for col_name in db.run(
- """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS
- WHERE table_name = :table ;""",
- table=table_name,
- )
- ]
- writer.writerow(column_names)
- writer.writerows(rows)
+ column_names = [
+ col_name[0]
+ for col_name in db.run(
+ """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS
+ WHERE table_name = :table ;""",
+ table=table_name,
+ )
+ ]
+
s3_key = datetime.strftime(
datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv"
)
- # Writing the new file to S3 extract bucket:
try:
- client.upload_file(csv_file_path, extract_bucket(), s3_key)
+ stream_to_s3(
+ table_name, rows, column_names, client, extract_bucket(), s3_key
+ )
load_status["updated"].append(table_name)
logger.info(f"Uploaded {s3_key} to S3.")
except ClientError as e:
git.ajschof.me — hosted by ajschofield — powered by cgit