aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex <git@ajschof.me>2024-08-20 12:16:00 +0100
committerGitHub <noreply@github.com>2024-08-20 12:16:00 +0100
commit983430f661bd89a406693d48b464b3120604f2dd (patch)
treefe9ac32321c17d2e98d786efaa8df309e8c6d0d4
parent7f01f8a66d57d389c58c6e09de7ed0c914e298ce (diff)
parent346aadfbf2208a0660ffc09959a91fc2f7b48c79 (diff)
downloadde-project-bentley-983430f661bd89a406693d48b464b3120604f2dd.tar.gz
de-project-bentley-983430f661bd89a406693d48b464b3120604f2dd.zip
Merge pull request #73 from ajschofield/alex/tf-s3-perms
pr: terraform changes & refactoring/adding to extract_lambda
-rw-r--r--src/extract_lambda.py101
-rw-r--r--terraform/iam.tf3
-rw-r--r--terraform/s3.tf4
3 files changed, 63 insertions, 45 deletions
diff --git a/src/extract_lambda.py b/src/extract_lambda.py
index 9de6214..e9f438b 100644
--- a/src/extract_lambda.py
+++ b/src/extract_lambda.py
@@ -10,9 +10,12 @@ from botocore.exceptions import ClientError
from pg8000.native import Connection, InterfaceError, identifier
logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
-
-# DB Exception class
+logging.basicConfig(
+ format="{asctime} - {levelname} - {message}",
+ style="{",
+ datefmt="%Y-%m-%d %H:%M",
+ level=logging.INFO,
+)
class DBConnectionException(Exception):
@@ -49,7 +52,7 @@ def lambda_handler(event, context):
),
}
except Exception as e:
- logger.error(f"Error: {e}")
+ logger.error(f"Error: {e}", exc_info=True)
return {"statusCode": 500, "body": json.dumps("Internal server error.")}
finally:
if db:
@@ -124,6 +127,7 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3
logger.error(f"Error retrieving S3 object {s3_key}: {e}")
else:
logger.error("The bucket is empty")
+ return None
except ClientError as e:
logger.error(f"Error listing S3 objects: {e}")
@@ -132,27 +136,18 @@ def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3
def get_latest_timestamp(existing_files):
- all_datetimes = []
- for file_name in existing_files.keys():
- match = re.search(r"\/(.+/).+_(.+)\.csv", file_name)
- if match:
- datetime_str = "".join(match.group(1, 2))
- all_datetimes.append(datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S"))
- return max(all_datetimes) if all_datetimes else datetime.min
-
-
-def stream_to_s3(table_name, rows, column_names, s3_client, bucket_name, s3_key):
- csv_buffer = StringIO()
- csv_writer = csv.writer(csv_buffer)
-
- csv_writer.writerow(column_names)
-
- for row in rows:
- csv_writer.writerow(row)
-
- csv_buffer.seek(0)
+ if existing_files:
+ all_datetimes = []
+ for file_name in existing_files.keys():
+ match = re.search(r"\/(.+/).+_(.+)\.csv", file_name)
+ if match:
+ datetime_str = "".join(match.group(1, 2))
+ all_datetimes.append(
+ datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S")
+ )
+ return max(all_datetimes) if all_datetimes else datetime.min
- s3_client.upload_fileobj(csv_buffer, bucket_name, s3_key)
+ return existing_files
def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
@@ -169,36 +164,52 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
"""
SELECT table_name
FROM information_schema.tables
- WHERE table_schema='public'
+ WHERE table_schema='public' AND table_name != '_prisma_migrations'
AND table_type='BASE TABLE';
"""
)
for table in tables:
table_name = table[0]
- rows = db.run(
- f"SELECT * FROM {identifier(table_name)} WHERE last_updated >= :latest;",
- latest={datetime.strftime(latest_timestamp, "%Y-%m-%d %H:%M:%S")},
+ base_query = f"""
+ SELECT * FROM {identifier(table_name)}
+ WHERE last_updated >= :latest;
+ """
+ latest = (
+ {
+ datetime.strftime(
+ latest_timestamp if latest_timestamp else datetime(1990, 1, 1),
+ "%Y-%m-%d %H:%M:%S",
+ )
+ },
)
+ logger.info(f"Processing table: {table_name}")
+ logger.info(f"Latest timestamp: {latest[0]}")
+ rows = db.run(base_query, latest=latest)
+ logger.debug(f"Rows: {rows}")
+ # Creating a temporary file path and writing the column name to it followed by each row of data
if rows:
- column_names = [
- col_name[0]
- for col_name in db.run(
- """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS
- WHERE table_name = :table ;""",
- table=table_name,
- )
- ]
-
- s3_key = (
- f"{table_name}/{datetime.now().strftime('%Y/%m/%d')}/"
- f"{table_name}_{datetime.now().strftime('%H:%M:%S')}.csv"
+ csv_file_path = f"/tmp/{table_name}.csv"
+ with open(csv_file_path, "w", newline="") as file:
+ writer = csv.writer(file)
+ # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
+ column_names = [
+ col_name[0]
+ for col_name in db.run(
+ """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS
+ WHERE table_name = :table ;""",
+ table=table_name,
+ )
+ ]
+ writer.writerow(column_names)
+ writer.writerows(rows)
+ s3_key = datetime.strftime(
+ datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv"
)
+ # Writing the new file to S3 extract bucket:
try:
- stream_to_s3(
- table_name, rows, column_names, client, extract_bucket(), s3_key
- )
+ client.upload_file(csv_file_path, extract_bucket(), s3_key)
load_status["updated"].append(table_name)
logger.info(f"Uploaded {s3_key} to S3.")
except ClientError as e:
@@ -207,3 +218,7 @@ def process_and_upload_tables(db, existing_files, client=boto3.client("s3")):
load_status["no change"].append(table_name)
logger.info(f"No new data")
return load_status
+
+
+if __name__ == "__main__":
+ lambda_handler(None, None)
diff --git a/terraform/iam.tf b/terraform/iam.tf
index a8054ca..3ac8c45 100644
--- a/terraform/iam.tf
+++ b/terraform/iam.tf
@@ -40,7 +40,8 @@ data "aws_iam_policy_document" "s3_data_policy_doc" {
"s3:PutObjectRetention",
"s3:PutObjectTagging",
"s3:PutObjectAcl",
- "s3:ListObjects"
+ "s3:ListObjects",
+ "s3:ListObjectsV2"
]
resources = [
"${aws_s3_bucket.extract_bucket.arn}/*",
diff --git a/terraform/s3.tf b/terraform/s3.tf
index d17a4fe..14e8835 100644
--- a/terraform/s3.tf
+++ b/terraform/s3.tf
@@ -4,7 +4,7 @@
resource "aws_s3_bucket" "extract_bucket" {
bucket_prefix = "${var.s3_extract_bucket_name}-"
-
+ force_destroy = true
tags = {
Name = "Ingestion Bucket"
}
@@ -23,6 +23,7 @@ resource "aws_s3_bucket_versioning" "extract_bucket_versioning" {
resource "aws_s3_bucket" "transform_bucket" {
bucket_prefix = "${var.s3_transform_bucket_name}-"
+ force_destroy = true
tags = {
Name = "Transform Bucket"
}
@@ -42,6 +43,7 @@ resource "aws_s3_bucket_versioning" "transform_bucket_versioning" {
resource "aws_s3_bucket" "lambda_code_bucket" {
bucket_prefix = "${var.s3_code_bucket_name}-"
+ force_destroy = true
tags = {
Name = "Lambda Bucket"
}
git.ajschof.me — hosted by ajschofield — powered by cgit