aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/extract_lambda.py101
1 files changed, 58 insertions, 43 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py
index 9de6214..e9f438b 100644
--- a/src/extract_lambda.py
+++ b/src/extract_lambda.py
@@ -10,9 +10,12 @@ from botocore.exceptions import ClientError
from pg8000.native import Connection, InterfaceError, identifier
logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
-
-# DB Exception class
+logging.basicConfig(
+ format="{asctime} - {levelname} - {message}",
+ style="{",
+ datefmt="%Y-%m-%d %H:%M",
+ level=logging.INFO,
+)
class DBConnectionException(Exception):
@@ -49,7 +52,7 @@ def lambda_handler(event, context):
),
}
except Exception as e:
- logger.error(f"Error: {e}")
+ logger.error(f"Error: {e}", exc_info=True)
return {"statusCode": 500, "body": json.dumps("Internal server error.")}
finally:
if db:
@@ -124,6 +127,7 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3
logger.error(f"Error retrieving S3 object {s3_key}: {e}")
else:
logger.error("The bucket is empty")
+ return None
except ClientError as e:
logger.error(f"Error listing S3 objects: {e}")
@@ -132,27 +136,18 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3
def get_latest_timestamp(existing_files):
- all_datetimes = []
- for file_name in existing_files.keys():
- match = re.search(r"\/(.+/).+_(.+)\.csv", file_name)
- if match:
- datetime_str = "".join(match.group(1, 2))
- all_datetimes.append(datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S"))
- return max(all_datetimes) if all_datetimes else datetime.min
-
-
-def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key):
- csv_buffer = StringIO()
- csv_writer = csv.writer(csv_buffer)
-
- csv_writer.writerow(column_names)
-
- for row in rows:
- csv_writer.writerow(row)
-
- csv_buffer.seek(0)
+ if existing_files:
+ all_datetimes = []
+ for file_name in existing_files.keys():
+ match = re.search(r"\/(.+/).+_(.+)\.csv", file_name)
+ if match:
+ datetime_str = "".join(match.group(1, 2))
+ all_datetimes.append(
+ datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")
+ )
+ return max(all_datetimes) if all_datetimes else datetime.min
- s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key)
+ return existing_files
def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
@@ -169,36 +164,52 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
"""
SELECT table_name
FROM information_schema.tables
- WHERE table_schema='public'
+ WHERE table_schema='public' AND table_name != '_prisma_migrations'
AND table_type='BASE TABLE';
"""
)
for table in tables:
table_name = table[0]
- rows = db.run(
- f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;",
- latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")},
+ base_query = f"""
+ SELECT * FROM {identifier(table_name)}
+ WHERE last_updated >= :latest;
+ """
+ latest = (
+ {
+ datetime.strftime(
+ latest_timestamp if latest_timestamp else datetime(1990, 1, 1),
+ "%Y-%m-%d %H:%M:%S",
+ )
+ },
)
+ logger.info(f"Processing table: {table_name}")
+ logger.info(f"Latest timestamp: {latest[0]}")
+ rows = db.run(base_query, latest=latest)
+ logger.debug(f"Rows: {rows}")
+ # Creating a temporary file path and writing the column name to it followed by each row of data
if rows:
- column_names = [
- col_name[0]
- for col_name in db.run(
- """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS
- WHERE table_name = :table ;""",
- table=table_name,
- )
- ]
-
- s3_key = (
- f"{table_name}/{datetime.now().strftime('%Y/%m/%d')}/"
- f"{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv"
+ csv_file_path = f"/tmp/{table_name}.csv"
+ with open(csv_file_path, "w", newline="") as file:
+ writer = csv.writer(file)
+ # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
+ column_names = [
+ col_name[0]
+ for col_name in db.run(
+ """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS
+ WHERE table_name = :table ;""",
+ table=table_name,
+ )
+ ]
+ writer.writerow(column_names)
+ writer.writerows(rows)
+ s3_key = datetime.strftime(
+ datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv"
)
+ # Writing the new file to S3 extract bucket:
try:
- stream_to_s3(
- table_name, rows, column_names, client, extract_bucket(), s3_key
- )
+ client.upload_file(csv_file_path, extract_bucket(), s3_key)
load_status["updated"].append(table_name)
logger.info(f"Uploaded {s3_key} to S3.")
except ClientError as e:
@@ -207,3 +218,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
load_status["no change"].append(table_name)
logger.info(f"No new data")
return load_status
+
+
+if __name__ == "__main__":
+ lambda_handler(None, None)
git.ajschof.me — hosted by ajschofield — powered by cgit