aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/deploy.yml37
-rw-r--r--.github/workflows/on-commit.yml50
-rw-r--r--.gitignore1
-rw-r--r--DEVNOTES.md100
-rw-r--r--README.md44
-rw-r--r--requirements.txt30
-rw-r--r--src/extract_lambda.py156
-rw-r--r--src/load_lambda.py2
-rw-r--r--src/secrets_manager.py48
-rw-r--r--src/transform_lambda.py2
-rw-r--r--terraform/events.tf91
-rw-r--r--terraform/iam.tf158
-rw-r--r--terraform/lambda.tf83
-rw-r--r--terraform/main.tf26
-rw-r--r--terraform/rds.tf80
-rw-r--r--terraform/s3.tf14
-rw-r--r--terraform/vars.tf38
-rw-r--r--test/test_secrets_manager.py34
-rw-r--r--tests/dummy.txt1
-rw-r--r--tests/dummy_identical.csv4
-rw-r--r--tests/test_extract_lambda.py109
21 files changed, 1007 insertions, 101 deletions
diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml
new file mode 100644
index 0000000..372d0b3
--- /dev/null
+++ b/.github/workflows/deploy.yml
@@ -0,0 +1,37 @@
+name: deploy-terraform
+
+on:
+ push:
+ branches:
+ - test-ci/** # Adjust the branch based on our deployment strategy
+
+jobs:
+ deploy-terraform:
+ name: Deploy Terraform
+ runs-on: ubuntu-latest
+ environment: test-env
+ steps:
+ - name: Checkout Repo
+ uses: actions/checkout@v4
+
+ - name: Install Terraform
+ uses: hashicorp/setup-terraform@v3
+
+ - name: Configure AWS Credentials
+ uses: aws-actions/configure-aws-credentials@v4
+ with:
+ aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
+ aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
+ aws-region: ${{ secrets.AWS_REGION }}
+
+ - name: Terraform Init
+ working-directory: terraform
+ run: terraform init
+
+ - name: Terraform Plan
+ working-directory: terraform
+ run: terraform plan
+
+ - name: Terraform Apply
+ working-directory: terraform
+ run: terraform apply --auto-approve \ No newline at end of file
diff --git a/.github/workflows/on-commit.yml b/.github/workflows/on-commit.yml
new file mode 100644
index 0000000..fd9ffb8
--- /dev/null
+++ b/.github/workflows/on-commit.yml
@@ -0,0 +1,50 @@
+name: commit-qc-checks
+
+on:
+ push:
+ branches-ignore:
+ - 'main'
+
+jobs:
+ python-quality-checks:
+ runs-on: ubuntu-latest
+ steps:
+ - uses : actions/checkout@v4
+ - name : 'Python: Setup'
+ uses : actions/setup-python@v5
+ with:
+ python-version: 3.11
+ - name : 'Python: Install Dependencies'
+ run: |
+ python -m pip install --upgrade pip
+ pip install flake8 pylint black bandit safety
+ continue-on-error: true
+ - name : 'Python: Linting'
+ run: |
+ flake8 .
+ find . -name "*.py" | xargs pylint
+ continue-on-error: true
+ - name : 'Python: Formatting'
+ run: |
+ black --check .
+ continue-on-error: true
+ terraform-quality-checks:
+ runs-on: ubuntu-latest
+ steps:
+ - uses : actions/checkout@v4
+ - name: 'Terraform: Setup'
+ uses: hashicorp/setup-terraform@v3
+ with:
+ terraform_version: latest
+ - name: 'Terraform: Formatting'
+ working-directory: terraform
+ run: terraform fmt -check -recursive
+ continue-on-error: true
+ - name: 'Terraform: Initialise'
+ working-directory: terraform
+ run: terraform init -backend=false
+ continue-on-error: true
+ - name: 'Terraform: Validate'
+ working-directory: terraform
+ run: terraform validate
+ continue-on-error: true \ No newline at end of file
diff --git a/.gitignore b/.gitignore
index cd44594..ca15434 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,6 +10,7 @@
# Output Files
*.zip
log*
+__pycache__/
# OS-Related Files
.DS_Store
diff --git a/DEVNOTES.md b/DEVNOTES.md
deleted file mode 100644
index 00b4ddd..0000000
--- a/DEVNOTES.md
+++ /dev/null
@@ -1,100 +0,0 @@
-# Workflow
-
-## References
-
-https://nvie.com/posts/a-successful-git-branching-model/ \
-https://learn.microsoft.com/en-us/azure/devops/repos/git/merging-with-squash?view=azure-devops
-
-
-## Branching
-
-*Based off GitFlow but slightly modified*
-
-- There are two main branches
- - `main` - production-ready code
- - `development` - integration branch for features
- - `staging` - represents the current staging state
-- In addition, there are additional branches
- - Feature branches - for new features and non-urgent bugfixes
- - Hotfix branches - probably won't be used but for critical bugs in production (this is what testing should prevent)
- - Release branches - for preparation of production releases
-
-- Feature branches - e.g. `feature/short-description`
-- Bugfix branches - e.g. `bugfix/short-description`
-- Hotfix branches - e.g. `hotfix/short-description`
-- Release branches - e.g. `release/vX.Y.Z`
-
-### Examples
-```
-feature/add-data-extractor
-bugfix/fix-s3-upload-error
-hotfix/security-patch
-release/v1.0.0
-```
-
-## Environments
-
-1. Development - where active development and initial testing occur
-2. Staging - for integration testing and final checks before production
-3. Production - live and stable environment
-
-## Deployment
-
-1. `main` - represents the current production state
-2. `develop` - represents the integration branch for features and non-urgent fixes
-3. `staging` - represents the current staging state
-
-## Staging Flow
-
-1. Create feature branches from `develop` & merge completed features back into `develop`
-2. When the `develop` branch is ready for testing, create a `staging` branch from `develop`
-3. Deploy the `staging` branch to the staging environment and perform our unit-tests
-4. If staging tests pass, create a `release/vX.Y.Z` branch from `staging`
-5. Make any final adjustments in the `release/vX.Y.Z` branch
-6. Once we have approved the changes in the `release/vX.Y.Z` branch, merge into `main`
-7. Tag the release in `main`
-
-### Notes
-
-- No new features should be included in the release branches and any new features should be merged into `develop` for the next release cycle
-
-## Commit Messages
-
-Please follow the conventional commits specification:
-
-```
-<type>[optional scope]: <description>
-
-<optional body>
-
-[optional footer(s)]
-```
-
-### Types
-- feat: new features
-- fix: bugfixes
-- docs: documentation-only changes
-- style: changes that do not affect the meaning of the code
-- refactor: code changes that neither fix bugs nor adds features
-- perf: code changes that improve performance
-- test: adding tests or correcting existing tests
-- chore: changes to build process or tools/libraries (probably not needed)
-- infra: changes to infrastructure configuration (e.g. Terraform)
-
-### Examples
-```
-feat(extract): add automatic scheduling for data ingestion
-docs: update README with project setup instructions
-```
-
-Configuration files for things such as Terraform isn't native to Conventional Commits, but we can add our own:
-
-```
-infra(tf): update S3 bucket policy
-```
-
-If the Terraform change involves a fix, you may combine `fix` and `infra`:
-
-```
-fix(infra): ...
-```
diff --git a/README.md b/README.md
index 8ae0cb3..6bc75dc 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,43 @@
-# de-project-bentley \ No newline at end of file
+# ToteSys - Data Engineering Project
+
+# Summary
+The project aims to implement a data platform that can extract data from an
+operational database, archive it in a data lake, and make it easily accessible
+within a remodelled OLAP data warehouse.
+
+The solution showcases our skills in:
+
+- Python
+- PostgreSQL
+- Database modelling
+- Amazon Web Services (AWS)
+- Agile methodologies
+
+# Main Objective
+
+Our goal is to create a reliable ETL (Extract, Transform, Load) pipeline that
+can:
+
+1. Extract the data from the `totesys` operational database
+2. Store the data in AWS S3 buckets, that will form our data lake
+3. Transform the data into a suitable schema for the data warehouse
+4. Load the transformed data into the data warehouse hosted on AWS
+
+# Key Features
+
+We aim for the project to have certain features. Some are more prioritised than
+others.
+
+- [ ] Automated data ingestion from `totesys` db
+- [ ] Data storage for ingested and processed data in S3 buckets
+- [ ] Data transformation for data warehouse schema
+- [ ] Automated data loading into the data warehouse schema
+- [ ] Logging and monitoring with CloudWatch
+- [ ] Notifications for errors and successful runs (e.g. successful ingestion)
+- [ ] Visualisation of warehouse data
+
+# Test Coverage
+TBA
+
+# Contributors
+TBA \ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..6f383f9
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,30 @@
+asn1crypto==1.5.1
+boto3==1.34.159
+botocore==1.34.159
+certifi==2024.7.4
+cffi==1.17.0
+charset-normalizer==3.3.2
+cryptography==43.0.0
+idna==3.7
+iniconfig==2.0.0
+Jinja2==3.1.4
+jmespath==1.0.1
+MarkupSafe==2.1.5
+moto==5.0.12
+packaging==24.1
+pg8000==1.31.2
+pluggy==1.5.0
+pycparser==2.22
+pytest==8.3.2
+pytest-mock==3.14.0
+python-dateutil==2.9.0.post0
+python-dotenv==1.0.1
+PyYAML==6.0.2
+requests==2.32.3
+responses==0.25.3
+s3transfer==0.10.2
+scramp==1.4.5
+six==1.16.0
+urllib3==2.2.2
+Werkzeug==3.0.3
+xmltodict==0.13.0 \ No newline at end of file
diff --git a/src/extract_lambda.py b/src/extract_lambda.py
new file mode 100644
index 0000000..fb2d7e8
--- /dev/null
+++ b/src/extract_lambda.py
@@ -0,0 +1,156 @@
+from pg8000.native import Connection, DatabaseError, InterfaceError
+from dotenv import dotenv_values
+import boto3
+import csv
+from botocore.exceptions import ClientError
+import logging
+import json
+from datetime import datetime
+import re
+
+
+logger = logging.getLogger()
+logger.setLevel(logging.INFO)
+
+
+class DBConnectionException(Exception):
+ """Wraps pg8000.native Error or DatabaseError."""
+
+ def __init__(self, e):
+ """Initialise with provided error message."""
+ self.message = str(e)
+ super().__init__(self.message)
+
+def lambda_handler(event, context):
+ """This lambda function connects to the Totesys database, lists the contents of the ingestion bucket,
+ and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them
+ it uses 3 helper functions to achieve these 3 functionalities
+ """
+ try:
+ db = connect_to_database()
+ existing_files = list_existing_s3_files()
+ any_changes = process_and_upload_tables(db, existing_files)
+
+ if not any_changes:
+ logger.info("No changes detected in the database.")
+ return {
+ 'statusCode': 200,
+ 'body': json.dumps('No changes detected, no CSV files were uploaded.')
+ }
+ else:
+ return {
+ 'statusCode': 200,
+ 'body': json.dumps('CSV files processed and uploaded successfully.')
+ }
+
+ except Exception as e:
+ logger.error(f'Error: {e}')
+ return {
+ 'statusCode': 500,
+ 'body': json.dumps('Internal server error.')
+ }
+
+ finally:
+
+ if db:
+ db.close()
+
+def get_config(path: str = ".env") -> dict:
+ return dotenv_values(path)
+
+
+def connect_to_database() -> Connection:
+ try:
+ config = get_config()
+ host = config["host"]
+ port = config["port"]
+ user = config["user"]
+ password = config["password"]
+ database = config["database"]
+
+ return Connection(
+ database=database,
+ user=user,
+ password=password,
+ host=host,
+ port=port
+ )
+ except InterfaceError as i:
+ logger.error(f'Interface error: {i}')
+ raise DBConnectionException("Failed to connect to database")
+
+
+
+def list_existing_s3_files(bucket_name='extract_bucket', client=boto3.client('s3')):
+ """Creates a dictionary and populates it with the
+ results of listing the contents of the s3 bucket, then
+ returns the populated dictionary
+ """
+
+ existing_files = {}
+
+ try:
+ response = client.list_objects_v2(Bucket='extract_bucket')
+
+ if 'Contents' in response:
+ for obj in response['Contents']:
+ s3_key = obj['Key']
+ try:
+ file_obj = client.get_object(Bucket=bucket_name, Key=s3_key)
+ file_content = file_obj['Body'].read().decode('utf-8')
+ existing_files[s3_key] = file_content
+ except ClientError as e:
+ logger.error(f'Error retrieving S3 object {s3_key}: {e}')
+ else:
+ logger.error('The bucket is empty')
+
+ except ClientError as e:
+ logger.error(f'Error listing S3 objects: {e}')
+
+ return existing_files
+
+
+
+def process_and_upload_tables(db, existing_files, client=boto3.client('s3')):
+ """Creates a list of the tables from a database query and
+ then selects everything from each table in individual queries
+ it then writes each table to CSV files and compares with the item
+ in the existing_files dictionary with the same name. If it finds any changes
+ to files, or new tables/files it uploads them to the s3 bucket
+ """
+ ## NEW CODE
+ all_datetimes = []
+ for file_names in existing_files.keys():
+ datetime_str_on_s3 = ''.join(re.search(r'\/(.+/).+_(.+)\.csv',file_names).group(1,2))
+ all_datetimes.append(datetime.strptime(datetime_str_on_s3, '%Y/%m/%d/%H:%M:%S'))
+ latest_timestamp = max(all_datetimes)
+ ## END OF NEW CODE
+
+ tables = db.run("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';")
+ print(tables)
+ for table in tables:
+ table_name = table[0]
+ rows = db.run(f"SELECT * FROM {table_name};")
+
+
+ csv_file_path = f"/tmp/{table_name}.csv"
+ with open(csv_file_path, "w", newline='') as file:
+ writer = csv.writer(file)
+ #column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")]
+ column_names = [col_name[0] for col_name in db.run(f"SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = '{table_name}';")]
+ writer.writerow(column_names)
+ writer.writerows(rows)
+ s3_key = datetime.strftime(datetime.today(),f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv')
+ new_csv_content = open(csv_file_path, "r").read()
+ ## NEW CODE
+ latest_s3_object_key = datetime.strftime(latest_timestamp,f'{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv')
+ ## END OF NEW CODE
+ if existing_files[latest_s3_object_key] != new_csv_content:
+ try:
+ client.upload_file(csv_file_path, 'extract_bucket', s3_key)
+ logger.info(f"Uploaded {s3_key} to S3.")
+ except ClientError as e:
+ logger.error(f'Error uploading to S3: {e}')
+ else:
+ logger.info(f"No new data.")
+ \ No newline at end of file
diff --git a/src/load_lambda.py b/src/load_lambda.py
new file mode 100644
index 0000000..6ee681f
--- /dev/null
+++ b/src/load_lambda.py
@@ -0,0 +1,2 @@
+def lambda_handler():
+ pass \ No newline at end of file
diff --git a/src/secrets_manager.py b/src/secrets_manager.py
new file mode 100644
index 0000000..c0fb61e
--- /dev/null
+++ b/src/secrets_manager.py
@@ -0,0 +1,48 @@
+import boto3
+from botocore.exceptions import ClientError
+import json
+
+
+def sm_client():
+ sm_client = boto3.client('secretsmanager')
+ yield sm_client
+
+def create_secret(sm_client, secret_name, cohort_id, user, password, host, database, port):
+ secret = {
+ "cohort_id": cohort_id,
+ "user": user,
+ "password": password,
+ "host": host,
+ "database": database,
+ "port": port
+ }
+
+ response = sm_client.create_secret(
+ Name = secret_name,
+ SecretString = json.dumps(secret)
+ )
+
+ print(response)
+ return response
+
+def list_secret(sm_client):
+ response = sm_client.list_secrets()
+ secret_dict = response['SecretList']
+ secret_names = []
+ for items in secret_dict:
+ secret_names.append(items['Name'])
+ print(f'{len(secret_names)} secret(s) available')
+ for name in secret_names:
+ print(name)
+ return secret_names
+
+def retrieve_secrets(sm_client):
+ response = sm_client.get_secrets(
+
+ )
+
+
+
+#retrieve secret
+#so lambda can access totesy db
+#so lambda connect to the db and then retrieve the data \ No newline at end of file
diff --git a/src/transform_lambda.py b/src/transform_lambda.py
new file mode 100644
index 0000000..6ee681f
--- /dev/null
+++ b/src/transform_lambda.py
@@ -0,0 +1,2 @@
+def lambda_handler():
+ pass \ No newline at end of file
diff --git a/terraform/events.tf b/terraform/events.tf
new file mode 100644
index 0000000..263141f
--- /dev/null
+++ b/terraform/events.tf
@@ -0,0 +1,91 @@
+resource "random_string" "eventbridge_suffix" {
+ length = 8
+ special = false
+ upper = false
+}
+
+resource "random_string" "s3_ingestion_suffix" {
+ length = 8
+ special = false
+ upper = false
+}
+
+resource "random_string" "s3_transform_suffix" {
+ length = 8
+ special = false
+ upper = false
+}
+
+resource "aws_cloudwatch_event_rule" "lambda_trigger" {
+ name = "lambda-scheduled-trigger"
+ description = "Schedule to trigger the Lambda function"
+ schedule_expression = "rate(30 minutes)"
+}
+
+resource "aws_cloudwatch_event_target" "extract_lambda_cw_event" {
+ rule = aws_cloudwatch_event_rule.lambda_trigger.name
+ target_id = "TargetFunctionV1"
+ arn = aws_lambda_function.extract_lambda.arn #replaced lambda name placeholder
+ depends_on = [aws_lambda_permission.allow_eventbridge]
+}
+
+resource "aws_lambda_permission" "allow_eventbridge" {
+ statement_id = "AllowExecutionFromEventBridge${random_string.eventbridge_suffix.result}"
+ action = "lambda:InvokeFunction"
+ function_name = aws_lambda_function.extract_lambda.function_name
+ principal = "events.amazonaws.com"
+ source_arn = aws_cloudwatch_event_rule.lambda_trigger.arn
+
+ lifecycle {
+ replace_triggered_by = [random_string.eventbridge_suffix]
+ }
+}
+
+# below is step function 1
+resource "aws_lambda_permission" "allow_s3_ingestion" {
+ statement_id = "AllowS3InvokeLambdaTransform${random_string.s3_ingestion_suffix.result}"
+ action = "lambda:InvokeFunction"
+ function_name = aws_lambda_function.transform_lambda.function_name #replaced lambda name placeholder
+ principal = "s3.amazonaws.com"
+ source_arn = aws_s3_bucket.extract_bucket.arn #replaced bucket name placeholder
+
+ lifecycle {
+ replace_triggered_by = [random_string.s3_ingestion_suffix]
+ }
+}
+
+
+resource "aws_s3_bucket_notification" "extract_bucket_notification" {
+ bucket = aws_s3_bucket.extract_bucket.id #replaced bucket name placeholder
+
+ lambda_function {
+ events = ["s3:ObjectCreated:*"]
+ lambda_function_arn = aws_lambda_function.transform_lambda.arn #replaced lambda name placeholder
+ }
+
+ depends_on = [aws_lambda_permission.allow_s3_ingestion]
+}
+
+resource "aws_lambda_permission" "allow_s3_transform_bucket" {
+ statement_id = "AllowS3InvokeLambdaTransform${random_string.s3_transform_suffix.result}"
+ action = "lambda:InvokeFunction"
+ function_name = aws_lambda_function.transform_lambda.function_name #replaced lambda name placeholder
+ principal = "s3.amazonaws.com"
+ source_arn = aws_s3_bucket.transform_bucket.arn #replaced bucket name placeholder
+
+ lifecycle {
+ replace_triggered_by = [random_string.s3_transform_suffix]
+ }
+}
+
+
+resource "aws_s3_bucket_notification" "transform_bucket_notification" {
+ bucket = aws_s3_bucket.transform_bucket.id #replaced bucket name placeholder
+
+ lambda_function {
+ events = ["s3:ObjectCreated:*"]
+ lambda_function_arn = aws_lambda_function.transform_lambda.arn #replaced lambda name placeholder
+ }
+
+ depends_on = [aws_lambda_permission.allow_s3_transform_bucket]
+}
diff --git a/terraform/iam.tf b/terraform/iam.tf
new file mode 100644
index 0000000..0e5fa6d
--- /dev/null
+++ b/terraform/iam.tf
@@ -0,0 +1,158 @@
+# Description: This file contains the IAM roles and policies for the lambda functions
+########################################################################
+# IAM MULTI-ROLE SETUP
+########################################################################
+
+# DEFINE MULTI-SERVICE ROLE (lambda, s3, cloudwatch, events)
+resource "aws_iam_role" "multi_service_role" {
+ name = "multi_service_role"
+
+ assume_role_policy = jsonencode({
+ Version = "2012-10-17"
+ Statement = [
+ {
+ Action = "sts:AssumeRole"
+ Effect = "Allow"
+ Principal = {
+ Service = [
+ "lambda.amazonaws.com",
+ "scheduler.amazonaws.com"
+ ]
+ }
+ }
+ ]
+ })
+}
+
+
+########################################################################
+# S3 SETUP
+# Description: allows allows retention/tagging/access control settings
+# Lambda IAM Policy for S3 Write
+########################################################################
+
+# S3 DEFINE POLICY
+data "aws_iam_policy_document" "s3_data_policy_doc" {
+ statement {
+ actions = [
+ "s3:PutObject",
+ "s3:PutObjectRetention",
+ "s3:PutObjectTagging",
+ "s3:PutObjectAcl"
+ ]
+ resources = [
+ "${aws_s3_bucket.extract_bucket.arn}/*",
+ "${aws_s3_bucket.transform_bucket.arn}/*",
+ "${aws_s3_bucket.lambda_code_bucket.arn}/*",
+ ]
+ }
+}
+
+
+########################################################################
+# LAMBDA SETUP
+# Description: Allows Lambda permission to write to Cloudwatch logs
+########################################################################
+
+resource "aws_iam_policy" "lambda_execution_policy" {
+ name = "lambda_execution_policy"
+ path = "/"
+ description = "IAM policy for Lambda execution"
+
+ policy = jsonencode({
+ Version = "2012-10-17"
+ Statement = [
+ {
+ Effect = "Allow"
+ Action = [
+ "lambda:InvokeFunction",
+ "lambda:GetFunction"
+ ]
+ Resource = "*"
+ }
+ ]
+ }
+ )
+}
+
+########################################################################
+# CLOUDWATCH SETUP
+# Description: Give permission for Lambda to write to CloudWatch logs
+########################################################################
+
+data "aws_iam_policy_document" "cw_document" {
+ statement {
+ actions = ["logs:CreateLogGroup"]
+ resources = [
+ "arn:aws:logs:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:*"
+ ]
+ }
+
+ statement {
+ actions = [
+ "logs:CreateLogStream",
+ "logs:CreateLogGroup",
+ "logs:PutLogEvents"
+ ]
+ resources = [
+ "arn:aws:logs:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:log-group:/aws/lambda/*"
+ ]
+ }
+}
+
+resource "aws_iam_policy" "cw_policy" {
+ name = "cw_policy"
+ policy = data.aws_iam_policy_document.cw_document.json
+}
+
+########################################################################
+# POLICY WRITE & ATTACH
+########################################################################
+
+# S3 WRITE POLICY
+resource "aws_iam_policy" "s3_write_policy" {
+ policy = data.aws_iam_policy_document.s3_data_policy_doc.json
+}
+
+resource "aws_iam_role_policy_attachment" "s3_attachment" {
+ role = aws_iam_role.multi_service_role.name
+ policy_arn = aws_iam_policy.s3_write_policy.arn
+}
+
+resource "aws_iam_role_policy_attachment" "lambda_attachment" {
+ role = aws_iam_role.multi_service_role.name
+ policy_arn = aws_iam_policy.lambda_execution_policy.arn
+}
+
+resource "aws_iam_role_policy_attachment" "cw_attachment" {
+ role = aws_iam_role.multi_service_role.name
+ policy_arn = aws_iam_policy.cw_policy.arn
+}
+
+###################
+# EVENTS POLICIES #
+###################
+
+data "aws_iam_policy_document" "cloudwatch_events_policy" {
+ statement {
+ actions = [
+ "events:PutRule",
+ "events:PutTargets",
+ "events:RemoveTargets",
+ "events:DeleteRule",
+ "events:PutEvents"
+ ]
+ resources = ["*"]
+ effect = "Allow"
+ }
+}
+
+resource "aws_iam_policy" "cloudwatch_events_policy" {
+ name = "cloudwatch_events_policy"
+ policy = data.aws_iam_policy_document.cloudwatch_events_policy.json
+}
+
+resource "aws_iam_role_policy_attachment" "cloudwatch_events_attachment" {
+ role = aws_iam_role.multi_service_role.name
+ policy_arn = aws_iam_policy.cloudwatch_events_policy.arn
+}
diff --git a/terraform/lambda.tf b/terraform/lambda.tf
new file mode 100644
index 0000000..72d1306
--- /dev/null
+++ b/terraform/lambda.tf
@@ -0,0 +1,83 @@
+# Extract Lambda Function
+data "archive_file" "extract_lambda_zip" {
+ type = "zip"
+ source_file = "${path.module}/../src/extract_lambda.py"
+ output_path = "${path.module}/../extract_function.zip"
+}
+resource "aws_s3_object" "extract_lambda_code" {
+ bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ key = "${var.extract_lambda_name}/extract_function.zip"
+ source = data.archive_file.extract_lambda_zip.output_path
+ etag = filemd5(data.archive_file.extract_lambda_zip.output_path)
+}
+
+resource "aws_lambda_function" "extract_lambda" {
+ function_name = var.extract_lambda_name
+ s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ s3_key = aws_s3_object.extract_lambda_code.key
+ role = aws_iam_role.multi_service_role.arn
+ handler = "extract_lambda.extract"
+ runtime = "python3.11"
+
+ lifecycle {
+ create_before_destroy = true
+ }
+
+ depends_on = [aws_s3_object.extract_lambda_code]
+}
+
+# Transform Lambda Function
+data "archive_file" "transform_lambda_zip" {
+ type = "zip"
+ source_file = "${path.module}/../src/transform_lambda.py"
+ output_path = "${path.module}/../transform_function.zip"
+}
+resource "aws_s3_object" "transform_lambda_code" {
+ bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ key = "${var.transform_lambda_name}/transform_function.zip"
+ source = data.archive_file.transform_lambda_zip.output_path
+ etag = filemd5(data.archive_file.transform_lambda_zip.output_path)
+}
+
+resource "aws_lambda_function" "transform_lambda" {
+ function_name = var.transform_lambda_name
+ s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ s3_key = aws_s3_object.transform_lambda_code.key
+ role = aws_iam_role.multi_service_role.arn
+ handler = "transform_lambda.transform"
+ runtime = "python3.11"
+
+ lifecycle {
+ create_before_destroy = true
+ }
+
+ depends_on = [aws_s3_object.transform_lambda_code]
+}
+
+# Load Lambda Function
+data "archive_file" "load_lambda_zip" {
+ type = "zip"
+ source_file = "${path.module}/../src/load_lambda.py"
+ output_path = "${path.module}/../load_function.zip"
+}
+resource "aws_s3_object" "load_lambda_code" {
+ bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ key = "${var.load_lambda_name}/load_function.zip"
+ source = data.archive_file.load_lambda_zip.output_path
+ etag = filemd5(data.archive_file.load_lambda_zip.output_path)
+}
+
+resource "aws_lambda_function" "load_lambda" {
+ function_name = var.load_lambda_name
+ s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket
+ s3_key = aws_s3_object.load_lambda_code.key
+ role = aws_iam_role.multi_service_role.arn
+ handler = "load_lambda.load"
+ runtime = "python3.11"
+
+ lifecycle {
+ create_before_destroy = true
+ }
+
+ depends_on = [aws_s3_object.load_lambda_code]
+}
diff --git a/terraform/main.tf b/terraform/main.tf
new file mode 100644
index 0000000..3b06701
--- /dev/null
+++ b/terraform/main.tf
@@ -0,0 +1,26 @@
+terraform {
+ required_providers {
+ aws = {
+ source = "hashicorp/aws"
+ version = "~>5.0"
+ }
+ }
+ backend "s3" {
+ bucket = "bentley-project-secrets"
+ key = "bentley-project/terraform.tfstate"
+ region = "eu-west-2"
+ }
+}
+
+provider "aws" {
+ region = "eu-west-2"
+ default_tags {
+ tags = {
+ ProjectName = "Terrific-Totes"
+ Team = "Team-Bentley"
+ Environment = "Dev"
+ GitHubRepo = "de-project-bentley"
+ ManagedBy = "Terraform"
+ }
+ }
+}
diff --git a/terraform/rds.tf b/terraform/rds.tf
new file mode 100644
index 0000000..88783b7
--- /dev/null
+++ b/terraform/rds.tf
@@ -0,0 +1,80 @@
+data "aws_availability_zones" "available" {}
+
+module "vpc" {
+ source = "terraform-aws-modules/vpc/aws"
+ version = "5.12.1"
+
+ name = var.project_name
+ cidr = "10.0.0.0/16"
+ azs = data.aws_availability_zones.available.names
+ public_subnets = ["10.0.4.0/24", "10.0.5.0/24", "10.0.6.0/24"]
+ enable_dns_hostnames = true
+ enable_dns_support = true
+}
+
+resource "aws_db_subnet_group" "Terrific-Totes-sub-gr" {
+ name = "tt-db-subnet"
+ subnet_ids = module.vpc.public_subnets
+
+ tags = {
+ Name = "${var.project_name}"
+ }
+}
+
+resource "aws_security_group" "rds" {
+ name = "${var.project_name}-rds"
+ vpc_id = module.vpc.vpc_id
+
+ ingress {
+ from_port = 5432
+ to_port = 5432
+ protocol = "tcp"
+ cidr_blocks = ["0.0.0.0/0"]
+ }
+
+ egress {
+ from_port = 5432
+ to_port = 5432
+ protocol = "tcp"
+ cidr_blocks = ["0.0.0.0/0"]
+ }
+
+ tags = {
+ Name = "${var.project_name}-rds"
+ }
+}
+
+resource "aws_db_parameter_group" "Terrific-Totes-param-gr" {
+ name = "tt-db-param"
+ family = "postgres14"
+
+ parameter {
+ name = "log_connections"
+ value = "1"
+ }
+}
+
+resource "aws_db_instance" "terrific-totes-rds" {
+ db_name = var.project_name
+ instance_class = "db.t3.micro"
+ allocated_storage = 5
+ engine = "postgres"
+ engine_version = "14.10"
+ username = "totes"
+ password = "totes123"
+ # username = "user credentials for the root user" # we could use .env here
+ # password = "user password for the root user" # we could use .env here
+ ### alternatively to providing username nad password we can specify:
+ # resource "aws_kms_key" "example_key" {
+ # description = "Example KMS Key"
+ # }
+ # within the resource:
+ # manage_master_user_password = true
+ # master_user_secret_kms_key_id = aws_kms_key.example.key_id
+ # }
+ db_subnet_group_name = aws_db_subnet_group.Terrific-Totes-sub-gr.name
+ vpc_security_group_ids = [aws_security_group.rds.id]
+ parameter_group_name = aws_db_parameter_group.Terrific-Totes-param-gr.name
+ publicly_accessible = false
+ skip_final_snapshot = true
+}
diff --git a/terraform/s3.tf b/terraform/s3.tf
new file mode 100644
index 0000000..d5cdee3
--- /dev/null
+++ b/terraform/s3.tf
@@ -0,0 +1,14 @@
+### EXTRACT BUCKET SET-UP
+resource "aws_s3_bucket" "extract_bucket" {
+ bucket_prefix = "${var.s3_extract_bucket_name}-"
+}
+
+### TRANSFORM BUCKET SET-UP
+resource "aws_s3_bucket" "transform_bucket" {
+ bucket_prefix = "${var.s3_transform_bucket_name}-"
+}
+
+### LAMBDA BUCKET
+resource "aws_s3_bucket" "lambda_code_bucket" {
+ bucket_prefix = "${var.s3_code_bucket_name}-"
+}
diff --git a/terraform/vars.tf b/terraform/vars.tf
new file mode 100644
index 0000000..3c88731
--- /dev/null
+++ b/terraform/vars.tf
@@ -0,0 +1,38 @@
+variable "s3_extract_bucket_name" {
+ type = string
+ default = "extract-bucket"
+}
+
+variable "s3_transform_bucket_name" {
+ type = string
+ default = "transform-bucket"
+}
+
+variable "s3_code_bucket_name" {
+ type = string
+ default = "lambda-bucket"
+}
+
+variable "extract_lambda_name" {
+ type = string
+ default = "extract-lambda"
+}
+
+variable "transform_lambda_name" {
+ type = string
+ default = "transform-lambda"
+}
+
+variable "load_lambda_name" {
+ type = string
+ default = "load-lambda"
+}
+
+variable "project_name" {
+ type = string
+ default = "tt"
+}
+
+data "aws_caller_identity" "current" {}
+
+data "aws_region" "current" {}
diff --git a/test/test_secrets_manager.py b/test/test_secrets_manager.py
new file mode 100644
index 0000000..86533bc
--- /dev/null
+++ b/test/test_secrets_manager.py
@@ -0,0 +1,34 @@
+from src.secrets_manager import sm_client, create_secret, list_secret
+import boto3
+from moto import mock_aws
+import json
+import pytest
+import os
+
+pytest.fixture(scope='class')
+def mock_aws_credentials():
+ """Mocked AWS Credentials for moto."""
+ os.environ["AWS_ACCESS_KEY_ID"] = "testing"
+ os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
+ os.environ["AWS_SECURITY_TOKEN"] = "testing"
+ os.environ["AWS_SESSION_TOKEN"] = "testing"
+ os.environ["AWS_DEFAULT_REGION"] = "eu-west-2"
+
+@pytest.fixture(scope='class')
+def mock_sm_client(mock_aws_credentials):
+ with mock_aws():
+ yield boto3.client('secretsmanager')
+
+
+def test_create_secret_stores_secrets(mock_sm_client):
+ cohort_id = "test_cohort_id"
+ user = "test_user_id"
+ password = "test_password"
+ host = "test_host"
+ database = "test_database"
+ port = "test_port"
+
+ secret_name = "test_secret"
+ response = create_secret(mock_sm_client, secret_name, cohort_id, user, password, host, database, port)
+
+ assert response['Name'] == secret_name \ No newline at end of file
diff --git a/tests/dummy.txt b/tests/dummy.txt
new file mode 100644
index 0000000..af27ff4
--- /dev/null
+++ b/tests/dummy.txt
@@ -0,0 +1 @@
+This is a test file. \ No newline at end of file
diff --git a/tests/dummy_identical.csv b/tests/dummy_identical.csv
new file mode 100644
index 0000000..fdd8993
--- /dev/null
+++ b/tests/dummy_identical.csv
@@ -0,0 +1,4 @@
+Food_type,Flavour,Colour
+Vegetable,Sour,Green
+Berry,Sweet,Red
+
diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py
new file mode 100644
index 0000000..e94a8a4
--- /dev/null
+++ b/tests/test_extract_lambda.py
@@ -0,0 +1,109 @@
+import pytest
+import boto3
+from moto import mock_aws
+from unittest.mock import patch, MagicMock
+from unittest import TestCase
+from src.extract_lambda import list_existing_s3_files, connect_to_database, DBConnectionException, process_and_upload_tables
+import os
+import logging
+
+@pytest.fixture(scope='class')
+def mock_config():
+ env_vars = {
+ "host": "abc",
+ "port": "5432",
+ "user": "def",
+ "password": "password",
+ "database": "db",
+ }
+ with patch("src.extract_lambda.get_config", return_value=env_vars) as mock_config:
+ yield mock_config
+
+
+@pytest.fixture(scope='class')
+def aws_credentials():
+ os.environ["AWS_ACCESS_KEY_ID"] = 'testing'
+ os.environ["AWS_SECRET_ACCESS_KEY"] = 'testing'
+ os.environ["AWS_SECURIT_TOKEN"] = 'testing'
+ os.environ["AWS_SESSION_TOKEN"] = 'testing'
+ os.environ["AWS_DEFAULT_REGION"]= 'eu-west-2'
+
+@pytest.fixture(scope='class')
+def s3_client(aws_credentials):
+ with mock_aws():
+ yield boto3.client('s3')
+
+class TestListExistingS3Files:
+ def test_error_if_no_bucket(self, s3_client, caplog):
+
+ logger = logging.getLogger()
+ logger.info('Testing now.')
+ caplog.set_level(logging.ERROR)
+ list_existing_s3_files(client=s3_client)
+ assert 'Error listing S3 objects' in caplog.text
+
+ def test_error_if_bucket_is_empty(self, s3_client, caplog):
+
+ s3_client.create_bucket(Bucket='extract_bucket',
+ CreateBucketConfiguration={
+ 'LocationConstraint': 'eu-west-2'
+ })
+ list_existing_s3_files(client=s3_client)
+ assert 'The bucket is empty' in caplog.text
+
+ def test_error_retrieving_object(self, s3_client, caplog):
+ s3_client.upload_file('tests/dummy.txt', 'extract_bucket', 'dummy.txt')
+ list_existing_s3_files(bucket_name='test_bucket', client=s3_client)
+
+ assert 'Error retrieving S3 object ' in caplog.text
+
+ def test_retrieves_file_content(self, s3_client, caplog):
+ result = list_existing_s3_files(client=s3_client)
+
+ assert list(result.values()) == ['This is a test file.']
+
+class TestConnectToDatabase:
+ def test_connect_to_database(mock_conn, mock_config):
+ with patch("src.extract_lambda.Connection", autospec=True) as mock_conn:
+ connect_to_database()
+ mock_conn.assert_called_with(
+ host="abc", user="def", port="5432", password="password", database="db"
+ )
+
+ def test_database_error(self, mock_config):
+ with pytest.raises(DBConnectionException):
+ connect_to_database()
+
+ def test_logs_interface_error(self, caplog):
+ logger = logging.getLogger()
+ logger.info('Testing now.')
+ caplog.set_level(logging.ERROR)
+ with pytest.raises(DBConnectionException):
+ connect_to_database()
+ assert 'Interface error' in caplog.text
+'''
+class TestProcessAndUploadTables:
+ def test_error_process_and_upload_tables(mock_conn, mock_config, s3_client, caplog):
+ logger = logging.getLogger()
+ logger.info('Testing now.')
+ caplog.set_level(logging.ERROR)
+ ####
+ queries = ["SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';",
+ "SELECT * FROM Fruits;",
+ "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = 'Fruits'"]
+ return_values = [[['Fruits']],
+ [['Vegetable','Sour','Green'],['Berry','Sweet','Red']],
+ [['Food_type'],['Flavour'],['Colour']]]
+ vals = dict(zip(queries,return_values))
+
+ ####
+ with patch('src.extract_lambda.connect_to_database') as mock_db:
+ mock_db().run.side_effects = return_values
+ s3_key = 'Fruits/2024/08/15/Fruits_16:46:30.csv'
+ existing_files = {s3_key: 'Food_type,Flavour,Colour\nFruit,Sour,Green\nBerry,Sweet,Red'}
+ s3_client.create_bucket(Bucket='extract_bucket',
+ CreateBucketConfiguration={'LocationConstraint': 'eu-west-2'})
+ s3_client.upload_file('tests/dummy_identical.csv', 'extract_bucket', s3_key)
+ process_and_upload_tables(mock_db(), existing_files, client=s3_client)
+ assert 'No new data.' in caplog.text
+''' \ No newline at end of file
git.ajschof.me — hosted by ajschofield — powered by cgit