diff options
| -rw-r--r-- | .github/workflows/deploy.yml | 42 | ||||
| -rw-r--r-- | .gitignore | 2 | ||||
| -rw-r--r-- | DEVNOTES.md | 100 | ||||
| -rw-r--r-- | README.md | 52 | ||||
| -rw-r--r-- | requirements.txt | 30 | ||||
| -rwxr-xr-x | scripts/deploy.sh | 56 | ||||
| -rwxr-xr-x | scripts/make_layer_zip.sh | 8 | ||||
| -rw-r--r-- | src/extract_lambda.py | 227 | ||||
| -rw-r--r-- | src/load_lambda.py | 2 | ||||
| -rw-r--r-- | src/transform_lambda.py | 2 | ||||
| -rw-r--r-- | terraform/events.tf | 109 | ||||
| -rw-r--r-- | terraform/iam.tf | 202 | ||||
| -rw-r--r-- | terraform/lambda.tf | 146 | ||||
| -rw-r--r-- | terraform/main.tf | 40 | ||||
| -rw-r--r-- | terraform/s3.tf | 57 | ||||
| -rw-r--r-- | terraform/vars.tf | 53 | ||||
| -rw-r--r-- | test.py | 0 | ||||
| -rw-r--r-- | tests/dummy.txt | 1 | ||||
| -rw-r--r-- | tests/dummy_identical.csv | 4 | ||||
| -rw-r--r-- | tests/test_extract_lambda.py | 247 | ||||
| -rw-r--r-- | tests/test_secrets_manager.py | 84 |
21 files changed, 1363 insertions, 101 deletions
diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 0000000..5672048 --- /dev/null +++ b/.github/workflows/deploy.yml @@ -0,0 +1,42 @@ +name: deploy-terraform + +on: + pull_request: + branches: + - main + push: + branches: + - main + + +jobs: + deploy-terraform: + name: Deploy Terraform + runs-on: ubuntu-latest + #needs: run-checks (must ref on-commit.yml file) + environment: production + steps: + - name: Checkout Repo + uses: actions/checkout@v4 + + - name: Install Terraform + uses: hashicorp/setup-terraform@v3 + + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: ${{ secrets.AWS_REGION }} + + - name: Terraform Init + working-directory: terraform + run: terraform init + + - name: Terraform Plan + working-directory: terraform + run: terraform plan + + - name: Terraform Apply + working-directory: terraform + run: terraform apply --auto-approve @@ -10,6 +10,8 @@ # Output Files *.zip log* +__pycache__/ # OS-Related Files .DS_Store +venv
\ No newline at end of file diff --git a/DEVNOTES.md b/DEVNOTES.md deleted file mode 100644 index 00b4ddd..0000000 --- a/DEVNOTES.md +++ /dev/null @@ -1,100 +0,0 @@ -# Workflow - -## References - -https://nvie.com/posts/a-successful-git-branching-model/ \ -https://learn.microsoft.com/en-us/azure/devops/repos/git/merging-with-squash?view=azure-devops - - -## Branching - -*Based off GitFlow but slightly modified* - -- There are two main branches - - `main` - production-ready code - - `development` - integration branch for features - - `staging` - represents the current staging state -- In addition, there are additional branches - - Feature branches - for new features and non-urgent bugfixes - - Hotfix branches - probably won't be used but for critical bugs in production (this is what testing should prevent) - - Release branches - for preparation of production releases - -- Feature branches - e.g. `feature/short-description` -- Bugfix branches - e.g. `bugfix/short-description` -- Hotfix branches - e.g. `hotfix/short-description` -- Release branches - e.g. `release/vX.Y.Z` - -### Examples -``` -feature/add-data-extractor -bugfix/fix-s3-upload-error -hotfix/security-patch -release/v1.0.0 -``` - -## Environments - -1. Development - where active development and initial testing occur -2. Staging - for integration testing and final checks before production -3. Production - live and stable environment - -## Deployment - -1. `main` - represents the current production state -2. `develop` - represents the integration branch for features and non-urgent fixes -3. `staging` - represents the current staging state - -## Staging Flow - -1. Create feature branches from `develop` & merge completed features back into `develop` -2. When the `develop` branch is ready for testing, create a `staging` branch from `develop` -3. Deploy the `staging` branch to the staging environment and perform our unit-tests -4. If staging tests pass, create a `release/vX.Y.Z` branch from `staging` -5. Make any final adjustments in the `release/vX.Y.Z` branch -6. Once we have approved the changes in the `release/vX.Y.Z` branch, merge into `main` -7. Tag the release in `main` - -### Notes - -- No new features should be included in the release branches and any new features should be merged into `develop` for the next release cycle - -## Commit Messages - -Please follow the conventional commits specification: - -``` -<type>[optional scope]: <description> - -<optional body> - -[optional footer(s)] -``` - -### Types -- feat: new features -- fix: bugfixes -- docs: documentation-only changes -- style: changes that do not affect the meaning of the code -- refactor: code changes that neither fix bugs nor adds features -- perf: code changes that improve performance -- test: adding tests or correcting existing tests -- chore: changes to build process or tools/libraries (probably not needed) -- infra: changes to infrastructure configuration (e.g. Terraform) - -### Examples -``` -feat(extract): add automatic scheduling for data ingestion -docs: update README with project setup instructions -``` - -Configuration files for things such as Terraform isn't native to Conventional Commits, but we can add our own: - -``` -infra(tf): update S3 bucket policy -``` - -If the Terraform change involves a fix, you may combine `fix` and `infra`: - -``` -fix(infra): ... -``` @@ -1 +1,51 @@ -# de-project-bentley
\ No newline at end of file +# ToteSys - Data Engineering Project + +[](https://www.python.org/) +[](https://aws.amazon.com/) +[](https://www.terraform.io/) +[](https://www.postgresql.org/) +[](https://github.com/features/actions) + +[](https://github.com/ajschofield/de-project-bentley/actions/workflows/deploy.yml?query=branch%3Amain) +[](https://github.com/ajschofield/de-project-bentley/deployments/production) +# Summary +The project aims to implement a data platform that can extract data from an +operational database, archive it in a data lake, and make it easily accessible +within a remodelled OLAP data warehouse. + +The solution showcases our skills in: + +- Python +- PostgreSQL +- Database modelling +- Amazon Web Services (AWS) +- Agile methodologies + +# Main Objective + +Our goal is to create a reliable ETL (Extract, Transform, Load) pipeline that +can: + +1. Extract the data from the `totesys` operational database +2. Store the data in AWS S3 buckets, that will form our data lake +3. Transform the data into a suitable schema for the data warehouse +4. Load the transformed data into the data warehouse hosted on AWS + +# Key Features + +We aim for the project to have certain features. Some are more prioritised than +others. + +- [ ] Automated data ingestion from `totesys` db +- [ ] Data storage for ingested and processed data in S3 buckets +- [ ] Data transformation for data warehouse schema +- [ ] Automated data loading into the data warehouse schema +- [ ] Logging and monitoring with CloudWatch +- [ ] Notifications for errors and successful runs (e.g. successful ingestion) +- [ ] Visualisation of warehouse data + +# Test Coverage +TBA + +# Contributors +TBA
\ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6f383f9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,30 @@ +asn1crypto==1.5.1 +boto3==1.34.159 +botocore==1.34.159 +certifi==2024.7.4 +cffi==1.17.0 +charset-normalizer==3.3.2 +cryptography==43.0.0 +idna==3.7 +iniconfig==2.0.0 +Jinja2==3.1.4 +jmespath==1.0.1 +MarkupSafe==2.1.5 +moto==5.0.12 +packaging==24.1 +pg8000==1.31.2 +pluggy==1.5.0 +pycparser==2.22 +pytest==8.3.2 +pytest-mock==3.14.0 +python-dateutil==2.9.0.post0 +python-dotenv==1.0.1 +PyYAML==6.0.2 +requests==2.32.3 +responses==0.25.3 +s3transfer==0.10.2 +scramp==1.4.5 +six==1.16.0 +urllib3==2.2.2 +Werkzeug==3.0.3 +xmltodict==0.13.0
\ No newline at end of file diff --git a/scripts/deploy.sh b/scripts/deploy.sh new file mode 100755 index 0000000..f631bbc --- /dev/null +++ b/scripts/deploy.sh @@ -0,0 +1,56 @@ +# Deploy Script +# Description: Deploy and destroy Terraform +# WARNING: This will most likely destroy any current infrastructure if protections +# are not in place. Be careful! + +# Exit if any command has a non-zero status +set -e + +# Change current directory to terraform folder at the start +cd ../terraform/ + +echo "WARNING: This script will destroy any infrastructure for testing." +echo "It should not be used once a proper deployment has been setup." +echo "Would you like to continue?" + +select yn in "Yes" "No"; do + case $yn in + Yes) + echo "Would you like to destroy the current infrastructure?" + select destroy_1 in "Yes" "No"; do + case $destroy_1 in + Yes) + terraform destroy + break + ;; + No) + echo "Skipping initial destroy..." + break + ;; + esac + done + + terraform apply + + echo "Would you like to destroy the newly-created infrastructure?" + select destroy_2 in "Yes" "No"; do + case $destroy_2 in + Yes) + terraform destroy + break + ;; + No) + echo "Skipping final destroy... Infrastructure will remain." + break + ;; + esac + done + + break + ;; + No) + echo "Operation cancelled..." + exit + ;; + esac +done diff --git a/scripts/make_layer_zip.sh b/scripts/make_layer_zip.sh new file mode 100755 index 0000000..eabe301 --- /dev/null +++ b/scripts/make_layer_zip.sh @@ -0,0 +1,8 @@ +# Description: Make the zip file for the layer + +cd "$(dirname "$0")/.." +mkdir -p python/lib/python3.11/site-packages +pip3 install --upgrade -r requirements.txt -t python/lib/python3.11/site-packages +rm layer.zip +zip -r layer.zip python +rm -r python/ diff --git a/src/extract_lambda.py b/src/extract_lambda.py new file mode 100644 index 0000000..24f0981 --- /dev/null +++ b/src/extract_lambda.py @@ -0,0 +1,227 @@ +import csv +import json +import logging +import re +from datetime import datetime +from io import StringIO + +import boto3 +from botocore.exceptions import ClientError +from pg8000.native import Connection, InterfaceError, identifier + +logger = logging.getLogger(__name__) + +logging.basicConfig( + format="{asctime} - {levelname} - {message}", + style="{", + datefmt="%Y-%m-%d %H:%M", + level=logging.DEBUG, +) + +logging.getLogger("botocore").setLevel(logging.WARNING) + + +class DBConnectionException(Exception): + """Wraps pg8000.native Error or DatabaseError.""" + + def __init__(self, e): + """Initialise with provided error message.""" + self.message = str(e) + super().__init__(self.message) + + +def lambda_handler(event, context): + """This lambda function connects to the Totesys database, lists the contents of the ingestion bucket, + and converts all tables to CSV and if any of those tables do not exist in, or are different to the ones in s3, it uploads them + it uses 3 helper functions to achieve these 3 functionalities + """ + db = None + try: + db = connect_to_database() + existing_files = list_existing_s3_files() + any_changes = process_and_upload_tables(db, existing_files) + + if not any_changes["updated"]: + logger.info("No changes detected in the database.") + return { + "statusCode": 200, + "body": json.dumps("No changes detected, no CSV files were uploaded."), + } + return { + "statusCode": 200, + "body": json.dumps( + f"""CSV files processed for {', '.join(any_changes['updated'])} and uploaded successfully.{ + 'The following tables were not updated: '+', '.join(any_changes['no change']) if any_changes['no change'] else ''}""" + ), + } + except Exception as e: + logger.error(f"Error: {e}", exc_info=True) + return {"statusCode": 500, "body": json.dumps("Internal server error.")} + finally: + if db: + db.close() + + +def retrieve_secrets(): + secret_name = "bentley-secrets" + region_name = "eu-west-2" + + # Create a Secrets Manager client + session = boto3.session.Session() + client = session.client(service_name="secretsmanager", region_name=region_name) + + try: + get_secret_value_response = client.get_secret_value(SecretId=secret_name) + except ClientError as e: + logger.error(f"Failed to retrieve secret {secret_name}: {str(e)}") + raise e + except KeyError: + logger.error(f"Secret {secret_name} does not contain a SecretString") + raise ValueError(f"Secret {secret_name} does not contain a SecretString") + + return get_secret_value_response["SecretString"] + + +def connect_to_database() -> Connection: + try: + secrets = json.loads(retrieve_secrets()) + host = secrets["host"] + port = secrets["port"] + user = secrets["user"] + password = secrets["password"] + database = secrets["database"] + + return Connection( + database=database, user=user, password=password, host=host, port=port + ) + except InterfaceError as i: + logger.error(f"Interface error: {i}") + raise DBConnectionException("Failed to connect to database") + + +def extract_bucket(client=boto3.client("s3")): + response = client.list_buckets() + extract_bucket_filter = [ + bucket["Name"] for bucket in response["Buckets"] if "extract" in bucket["Name"] + ] + + return extract_bucket_filter[0] + + +def list_existing_s3_files(bucket_name=extract_bucket(), client=boto3.client("s3")): + """Creates a dictionary and populates it with the + results of listing the contents of the s3 bucket, then + returns the populated dictionary + """ + logging.info("Listing existing S3 files") + existing_files = {} + + try: + response = client.list_objects_v2(Bucket=bucket_name) + + if "Contents" in response: + for obj in response["Contents"]: + s3_key = obj["Key"] + try: + file_obj = client.get_object(Bucket=bucket_name, Key=s3_key) + file_content = file_obj["Body"].read().decode("utf-8") + existing_files[s3_key] = file_content + except ClientError as e: + logger.error(f"Error retrieving S3 object {s3_key}: {e}") + else: + logger.error("The bucket is empty") + return None + + except ClientError as e: + logger.error(f"Error listing S3 objects: {e}") + + return existing_files + + +def get_latest_timestamp(existing_files): + if existing_files: + all_datetimes = [] + for file_name in existing_files.keys(): + match = re.search(r"\/(.+/).+_(.+)\.csv", file_name) + if match: + datetime_str = "".join(match.group(1, 2)) + all_datetimes.append( + datetime.strptime(datetime_str, "%Y/%m/%d/%H:%M:%S") + ) + return max(all_datetimes) if all_datetimes else datetime.min + + return existing_files + + +def process_and_upload_tables(db, existing_files, client=boto3.client("s3")): + """Creates a list of the tables from a database query and + then selects everything from each table in individual queries + it then writes each table to CSV files and compares with the item + in the existing_files dictionary with the same name. If it finds any changes + to files, or new tables/files it uploads them to the s3 bucket + """ + load_status = {"updated": [], "no change": []} + latest_timestamp = get_latest_timestamp(existing_files) + + tables = db.run( + """ + SELECT table_name + FROM information_schema.tables + WHERE table_schema='public' AND table_name != '_prisma_migrations' + AND table_type='BASE TABLE'; + """ + ) + + for table in tables: + table_name = table[0] + base_query = f""" + SELECT * FROM {identifier(table_name)} + WHERE last_updated >= :latest; + """ + latest = ( + { + datetime.strftime( + latest_timestamp if latest_timestamp else datetime(1990, 1, 1), + "%Y-%m-%d %H:%M:%S", + ) + }, + ) + logger.info(f"Processing table: {table_name}") + logger.info(f"Latest timestamp: {latest[0]}") + rows = db.run(base_query, latest=latest) + logger.debug(f"Rows: {rows}") + # Creating a temporary file path and writing the column name to it followed by each row of data + if rows: + csv_file_path = f"/tmp/{table_name}.csv" + with open(csv_file_path, "w", newline="") as file: + writer = csv.writer(file) + # column_names = [desc["name"] for desc in db.columns(f"SELECT * FROM {table_name};")] + column_names = [ + col_name[0] + for col_name in db.run( + """SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS + WHERE table_name = :table ;""", + table=table_name, + ) + ] + writer.writerow(column_names) + writer.writerows(rows) + s3_key = datetime.strftime( + datetime.today(), f"{table_name}/%Y/%m/%d/{table_name}_%H:%M:%S.csv" + ) + + # Writing the new file to S3 extract bucket: + try: + client.upload_file(csv_file_path, extract_bucket(), s3_key) + load_status["updated"].append(table_name) + logger.info(f"Uploaded {s3_key} to S3.") + except ClientError as e: + logger.error(f"Error uploading to S3: {e}") + else: + load_status["no change"].append(table_name) + logger.info(f"No new data") + return load_status + + +if __name__ == "__main__": + lambda_handler(None, None) diff --git a/src/load_lambda.py b/src/load_lambda.py new file mode 100644 index 0000000..c6a8e60 --- /dev/null +++ b/src/load_lambda.py @@ -0,0 +1,2 @@ +def lambda_handler(): + pass diff --git a/src/transform_lambda.py b/src/transform_lambda.py new file mode 100644 index 0000000..c6a8e60 --- /dev/null +++ b/src/transform_lambda.py @@ -0,0 +1,2 @@ +def lambda_handler(): + pass diff --git a/terraform/events.tf b/terraform/events.tf new file mode 100644 index 0000000..53ae10a --- /dev/null +++ b/terraform/events.tf @@ -0,0 +1,109 @@ +################# +# Random String # +################# + +resource "random_string" "eventbridge_suffix" { + length = 8 + special = false + upper = false +} + +resource "random_string" "s3_ingestion_suffix" { + length = 8 + special = false + upper = false +} + +resource "random_string" "s3_transform_suffix" { + length = 8 + special = false + upper = false +} + +############################# +# EventBridge Configuration # +############################# + +resource "aws_cloudwatch_event_rule" "lambda_trigger" { + name = "lambda-scheduled-trigger" + description = "Schedule to trigger the Lambda function" + schedule_expression = "rate(30 minutes)" +} + +resource "aws_cloudwatch_event_target" "extract_lambda_cw_event" { + rule = aws_cloudwatch_event_rule.lambda_trigger.name + target_id = "TargetFunctionV1" + arn = aws_lambda_function.extract_lambda.arn + depends_on = [aws_lambda_permission.allow_eventbridge] +} + +resource "aws_lambda_permission" "allow_eventbridge" { + statement_id = "AllowExecutionFromEventBridge${random_string.eventbridge_suffix.result}" + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.extract_lambda.function_name + principal = "events.amazonaws.com" + source_arn = aws_cloudwatch_event_rule.lambda_trigger.arn + + lifecycle { + create_before_destroy = true + replace_triggered_by = [random_string.eventbridge_suffix] + } +} + +######################################## +# S3 Extract Bucket Notification Setup # +######################################## + +resource "aws_lambda_permission" "allow_s3_ingestion" { + statement_id = "AllowS3InvokeLambdaTransform${random_string.s3_ingestion_suffix.result}" + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.transform_lambda.function_name + principal = "s3.amazonaws.com" + source_arn = aws_s3_bucket.extract_bucket.arn + + lifecycle { + create_before_destroy = true + replace_triggered_by = [random_string.s3_ingestion_suffix] + } +} + + +resource "aws_s3_bucket_notification" "extract_bucket_notification" { + bucket = aws_s3_bucket.extract_bucket.id + + lambda_function { + events = ["s3:ObjectCreated:*"] + lambda_function_arn = aws_lambda_function.transform_lambda.arn + } + + depends_on = [aws_lambda_permission.allow_s3_ingestion] +} + +########################################## +# S3 Transform Bucket Notification Setup # +########################################## + +resource "aws_lambda_permission" "allow_s3_transform_bucket" { + statement_id = "AllowS3InvokeLambdaTransform${random_string.s3_transform_suffix.result}" + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.transform_lambda.function_name + principal = "s3.amazonaws.com" + source_arn = aws_s3_bucket.transform_bucket.arn + + lifecycle { + create_before_destroy = true + replace_triggered_by = [random_string.s3_transform_suffix] + } +} + + +resource "aws_s3_bucket_notification" "transform_bucket_notification" { + bucket = aws_s3_bucket.transform_bucket.id + + lambda_function { + events = ["s3:ObjectCreated:*"] + lambda_function_arn = aws_lambda_function.transform_lambda.arn + } + + depends_on = [aws_lambda_permission.allow_s3_transform_bucket] +} diff --git a/terraform/iam.tf b/terraform/iam.tf new file mode 100644 index 0000000..3d62b69 --- /dev/null +++ b/terraform/iam.tf @@ -0,0 +1,202 @@ +# Description: This file contains the IAM roles and policies for the lambda functions +######################################################################## +# IAM MULTI-ROLE SETUP +######################################################################## + +# DEFINE MULTI-SERVICE ROLE (lambda, s3, cloudwatch, events) +resource "aws_iam_role" "multi_service_role" { + name = "multi_service_role" + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Principal = { + Service = [ + "lambda.amazonaws.com", + "scheduler.amazonaws.com" + ] + } + } + ] + }) +} + + +######################################################################## +# S3 SETUP +# Description: allows allows retention/tagging/access control settings +# Lambda IAM Policy for S3 +######################################################################## + +# S3 DEFINE POLICY +data "aws_iam_policy_document" "s3_data_policy_doc" { + statement { + effect = "Allow" + actions = [ + "s3:PutObject", + "s3:PutObjectRetention", + "s3:PutObjectTagging", + "s3:PutObjectAcl", + "s3:ListObjects", + "s3:ListObjectsV2", + "s3:GetObject" + ] + resources = [ + "${aws_s3_bucket.extract_bucket.arn}/*", + "${aws_s3_bucket.transform_bucket.arn}/*", + "${aws_s3_bucket.lambda_code_bucket.arn}/*", + ] + } + + statement { + effect = "Allow" + actions = [ + "s3:ListBucket", + "s3:ListAllMyBuckets", + "s3:ListObjectsV2", + "s3:ListObjects" + ] + resources = [ + "arn:aws:s3:::*", + ] + } +} + + +######################################################################## +# LAMBDA SETUP +# Description: Allows Lambda permission to write to Cloudwatch logs +######################################################################## + +resource "aws_iam_policy" "lambda_execution_policy" { + name = "lambda_execution_policy" + path = "/" + description = "IAM policy for Lambda execution" + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Effect = "Allow" + Action = [ + "lambda:InvokeFunction", + "lambda:GetFunction" + ] + Resource = "*" + } + ] + } + ) +} + +######################################################################## +# CLOUDWATCH SETUP +# Description: Give permission for Lambda to write to CloudWatch logs +######################################################################## + +data "aws_iam_policy_document" "cw_document" { + statement { + actions = ["logs:CreateLogGroup"] + resources = [ + "arn:aws:logs:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:*" + ] + } + + statement { + actions = [ + "logs:CreateLogStream", + "logs:CreateLogGroup", + "logs:PutLogEvents" + ] + resources = [ + "arn:aws:logs:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:log-group:/aws/lambda/*" + ] + } +} + +resource "aws_iam_policy" "cw_policy" { + name = "cw_policy" + policy = data.aws_iam_policy_document.cw_document.json +} + +######################################################################## +# POLICY WRITE & ATTACH +######################################################################## + +# S3 WRITE POLICY +resource "aws_iam_policy" "s3_write_policy" { + policy = data.aws_iam_policy_document.s3_data_policy_doc.json +} + +resource "aws_iam_role_policy_attachment" "s3_attachment" { + role = aws_iam_role.multi_service_role.name + policy_arn = aws_iam_policy.s3_write_policy.arn +} + +resource "aws_iam_role_policy_attachment" "lambda_attachment" { + role = aws_iam_role.multi_service_role.name + policy_arn = aws_iam_policy.lambda_execution_policy.arn +} + +resource "aws_iam_role_policy_attachment" "cw_attachment" { + role = aws_iam_role.multi_service_role.name + policy_arn = aws_iam_policy.cw_policy.arn +} + +################### +# EVENTS POLICIES # +################### + +data "aws_iam_policy_document" "cloudwatch_events_policy" { + statement { + actions = [ + "events:PutRule", + "events:PutTargets", + "events:RemoveTargets", + "events:DeleteRule", + "events:PutEvents" + ] + resources = ["*"] + effect = "Allow" + } +} + +resource "aws_iam_policy" "cloudwatch_events_policy" { + name = "cloudwatch_events_policy" + policy = data.aws_iam_policy_document.cloudwatch_events_policy.json +} + +resource "aws_iam_role_policy_attachment" "cloudwatch_events_attachment" { + role = aws_iam_role.multi_service_role.name + policy_arn = aws_iam_policy.cloudwatch_events_policy.arn +} + +######################### +# SECRETS MANAGER SETUP # +######################### + +# Policy Doc +data "aws_iam_policy_document" "secrets_manager_policy_doc" { + statement { + effect = "Allow" + actions = [ + "secretsmanager:GetSecretValue" + ] + resources = ["arn:aws:secretsmanager:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:secret:bentley-secrets-Na0yc8"] + } +} + +# SM Policy Resource +resource "aws_iam_policy" "secrets_manager_policy" { + name = "secrets_manager_policy" + policy = data.aws_iam_policy_document.secrets_manager_policy_doc.json +} + +# Attach SM Policy to Role +resource "aws_iam_role_policy_attachment" "secrets_manager_attachment" { + role = aws_iam_role.multi_service_role.name + policy_arn = aws_iam_policy.secrets_manager_policy.arn +} diff --git a/terraform/lambda.tf b/terraform/lambda.tf new file mode 100644 index 0000000..f8e7515 --- /dev/null +++ b/terraform/lambda.tf @@ -0,0 +1,146 @@ +#################### +# Common Variables # +#################### + +locals { + layer_dir = "../" + layer_zip = "layer.zip" + layer_name = "lambda_layer" + script_dir = "../scripts" + layer_zip_path = "${local.layer_dir}/${local.layer_zip}" +} + +###################### +# Lambda Layer Setup # +###################### + +resource "null_resource" "prepare_layer" { + + # New change: only run the script if the layer zip does not exist + + triggers = { + layer_zip_exists = fileexists(local.layer_zip_path) ? "exists" : "not_exists" + } + + provisioner "local-exec" { + command = "if [ ! -f ${local.layer_zip_path} ]; then bash ${local.script_dir}/make_layer_zip.sh; fi" + } +} + +resource "aws_s3_object" "lambda_layer_zip" { + bucket = aws_s3_bucket.lambda_code_bucket.id #bucket instead of id + key = "${local.layer_name}/${local.layer_zip}" + source = "${local.layer_dir}/${local.layer_zip}" + depends_on = [null_resource.prepare_layer] + etag = fileexists(local.layer_zip_path) ? filemd5(local.layer_zip_path) : null +} + +resource "aws_lambda_layer_version" "lambda_layer" { + layer_name = local.layer_name + compatible_runtimes = ["python3.11"] + s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket + s3_key = aws_s3_object.lambda_layer_zip.key + source_code_hash = fileexists(local.layer_zip_path) ? filebase64sha256(local.layer_zip_path) : null + skip_destroy = true + depends_on = [aws_s3_object.lambda_layer_zip] +} + +########################### +# Extract Lambda Function # +########################### + +data "archive_file" "extract_lambda_zip" { + type = "zip" + source_file = "${path.module}/../src/extract_lambda.py" + output_path = "${path.module}/../extract_function.zip" +} +resource "aws_s3_object" "extract_lambda_code" { + bucket = aws_s3_bucket.lambda_code_bucket.bucket + key = "${var.extract_lambda_name}/extract_function.zip" + source = data.archive_file.extract_lambda_zip.output_path + etag = filemd5(data.archive_file.extract_lambda_zip.output_path) +} + +resource "aws_lambda_function" "extract_lambda" { + function_name = var.extract_lambda_name + s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket + s3_key = aws_s3_object.extract_lambda_code.key + layers = [aws_lambda_layer_version.lambda_layer.arn] + role = aws_iam_role.multi_service_role.arn + handler = "extract_lambda.lambda_handler" + runtime = "python3.11" + source_code_hash = data.archive_file.extract_lambda_zip.output_base64sha256 + + lifecycle { + create_before_destroy = true + } + + depends_on = [aws_s3_object.extract_lambda_code] +} + +############################# +# Transform Lambda Function # +############################# + +data "archive_file" "transform_lambda_zip" { + type = "zip" + source_file = "${path.module}/../src/transform_lambda.py" + output_path = "${path.module}/../transform_function.zip" +} +resource "aws_s3_object" "transform_lambda_code" { + bucket = aws_s3_bucket.lambda_code_bucket.bucket + key = "${var.transform_lambda_name}/transform_function.zip" + source = data.archive_file.transform_lambda_zip.output_path + etag = filemd5(data.archive_file.transform_lambda_zip.output_path) +} + +resource "aws_lambda_function" "transform_lambda" { + function_name = var.transform_lambda_name + s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket + s3_key = aws_s3_object.transform_lambda_code.key + layers = [aws_lambda_layer_version.lambda_layer.arn] + role = aws_iam_role.multi_service_role.arn + handler = "transform_lambda.lambda_handler" + runtime = "python3.11" + source_code_hash = data.archive_file.transform_lambda_zip.output_base64sha256 + + lifecycle { + create_before_destroy = true + } + + depends_on = [aws_s3_object.transform_lambda_code] +} + +######################## +# Load Lambda Function # +######################## + +data "archive_file" "load_lambda_zip" { + type = "zip" + source_file = "${path.module}/../src/load_lambda.py" + output_path = "${path.module}/../load_function.zip" +} +resource "aws_s3_object" "load_lambda_code" { + bucket = aws_s3_bucket.lambda_code_bucket.bucket + key = "${var.load_lambda_name}/load_function.zip" + source = data.archive_file.load_lambda_zip.output_path + etag = filemd5(data.archive_file.load_lambda_zip.output_path) +} + +resource "aws_lambda_function" "load_lambda" { + function_name = var.load_lambda_name + s3_bucket = aws_s3_bucket.lambda_code_bucket.bucket + s3_key = aws_s3_object.load_lambda_code.key + layers = [aws_lambda_layer_version.lambda_layer.arn] + role = aws_iam_role.multi_service_role.arn + handler = "load_lambda.lambda_handler" + runtime = "python3.11" + source_code_hash = data.archive_file.load_lambda_zip.output_base64sha256 + + lifecycle { + create_before_destroy = true + } + + depends_on = [aws_s3_object.load_lambda_code] +} + diff --git a/terraform/main.tf b/terraform/main.tf new file mode 100644 index 0000000..6577b70 --- /dev/null +++ b/terraform/main.tf @@ -0,0 +1,40 @@ +terraform { + required_version = ">= 1.8.0" + required_providers { + aws = { + source = "hashicorp/aws" + version = "~>5.0" + } + null = { + source = "hashicorp/null" + version = "~>3.2.2" + } + archive = { + source = "hashicorp/archive" + version = "~>2.5.0" + } + random = { + source = "hashicorp/random" + version = "~>3.6.2" + } + } + backend "s3" { + bucket = "bentley-project-secrets" + key = "bentley-project/terraform.tfstate" + region = "eu-west-2" + encrypt = true + } +} + +provider "aws" { + region = "eu-west-2" + default_tags { + tags = { + ProjectName = var.project_name + Environment = var.environment + ManagedBy = "Terraform" + GitHubRepo = var.github_repo + Team = var.team_name + } + } +} diff --git a/terraform/s3.tf b/terraform/s3.tf new file mode 100644 index 0000000..14e8835 --- /dev/null +++ b/terraform/s3.tf @@ -0,0 +1,57 @@ +######################## +# EXTRACT BUCKET SETUP # +######################## + +resource "aws_s3_bucket" "extract_bucket" { + bucket_prefix = "${var.s3_extract_bucket_name}-" + force_destroy = true + tags = { + Name = "Ingestion Bucket" + } +} + +resource "aws_s3_bucket_versioning" "extract_bucket_versioning" { + bucket = aws_s3_bucket.extract_bucket.id + versioning_configuration { + status = "Enabled" + } +} + +########################## +# TRANSFORM BUCKET SETUP # +########################## + +resource "aws_s3_bucket" "transform_bucket" { + bucket_prefix = "${var.s3_transform_bucket_name}-" + force_destroy = true + tags = { + Name = "Transform Bucket" + } +} + + +resource "aws_s3_bucket_versioning" "transform_bucket_versioning" { + bucket = aws_s3_bucket.transform_bucket.id + versioning_configuration { + status = "Enabled" + } +} + +####################### +# LAMBDA BUCKET SETUP # +####################### + +resource "aws_s3_bucket" "lambda_code_bucket" { + bucket_prefix = "${var.s3_code_bucket_name}-" + force_destroy = true + tags = { + Name = "Lambda Bucket" + } +} + +resource "aws_s3_bucket_versioning" "lambda_bucket_versioning" { + bucket = aws_s3_bucket.lambda_code_bucket.id + versioning_configuration { + status = "Enabled" + } +} diff --git a/terraform/vars.tf b/terraform/vars.tf new file mode 100644 index 0000000..b3e3e47 --- /dev/null +++ b/terraform/vars.tf @@ -0,0 +1,53 @@ +variable "s3_extract_bucket_name" { + type = string + default = "extract-bucket" +} + +variable "s3_transform_bucket_name" { + type = string + default = "transform-bucket" +} + +variable "s3_code_bucket_name" { + type = string + default = "lambda-bucket" +} + +variable "extract_lambda_name" { + type = string + default = "extract-lambda" +} + +variable "transform_lambda_name" { + type = string + default = "transform-lambda" +} + +variable "load_lambda_name" { + type = string + default = "load-lambda" +} + +variable "project_name" { + type = string + default = "tt" +} + +variable "environment" { + type = string + default = "dev" +} + +variable "github_repo" { + type = string + default = "de-project-bentley" +} + +variable "team_name" { + type = string + default = "Team-Bentley" +} + +data "aws_caller_identity" "current" {} + +data "aws_region" "current" {} diff --git a/test.py b/test.py deleted file mode 100644 index e69de29..0000000 --- a/test.py +++ /dev/null diff --git a/tests/dummy.txt b/tests/dummy.txt new file mode 100644 index 0000000..af27ff4 --- /dev/null +++ b/tests/dummy.txt @@ -0,0 +1 @@ +This is a test file.
\ No newline at end of file diff --git a/tests/dummy_identical.csv b/tests/dummy_identical.csv new file mode 100644 index 0000000..e44e9fc --- /dev/null +++ b/tests/dummy_identical.csv @@ -0,0 +1,4 @@ +Food_type,Flavour,Colour,last_updated +Vegetable,Sour,Green,2022-11-03 14:20:49.962 +Berry,Sweet,Red,2022-11-03 14:20:49.962 + diff --git a/tests/test_extract_lambda.py b/tests/test_extract_lambda.py new file mode 100644 index 0000000..548ce67 --- /dev/null +++ b/tests/test_extract_lambda.py @@ -0,0 +1,247 @@ +import boto3.exceptions +import botocore.exceptions +import pytest +import boto3 +from moto import mock_aws +from unittest.mock import patch, MagicMock +from unittest import TestCase +import os +import logging +import json +from src.extract_lambda import ( + list_existing_s3_files, + connect_to_database, + DBConnectionException, + lambda_handler, + process_and_upload_tables, + retrieve_secrets, + extract_bucket, +) + + +@pytest.fixture(scope="class") +def mock_config(): + env_vars = { + "host": "abc", + "port": "5432", + "user": "def", + "password": "password", + "database": "db", + } + with patch( + "src.extract_lambda.retrieve_secrets", return_value=env_vars + ) as mock_config: + yield mock_config + + +@pytest.fixture(scope="class") +def aws_credentials(): + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" + os.environ["AWS_SECURITY_TOKEN"] = "testing" + os.environ["AWS_SESSION_TOKEN"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = "eu-west-2" + + +@pytest.fixture(scope="class") +def s3_client(aws_credentials): + with mock_aws(): + yield boto3.client("s3") + + +@pytest.fixture(scope="class") +def s3_mock_bucket(s3_client): + bucket = s3_client.create_bucket( + Bucket="extract_bucket", + CreateBucketConfiguration={"LocationConstraint": "eu-west-2"}, + ) + return bucket + + +class TestLambdaHandler: + def test_files_processed_and_uploaded_successfully(self, mocker): + mock_db = MagicMock() + mock_db.run.side_effect = [ + [["Fruits"]], + [["Vegetable", "Sour", "Green"], ["Berry", "Sweet", "Red"]], + [["Food_type"], ["Flavour"], ["Colour"]], + ] + mock_db.columns.return_value = [ + {"name": "Food_type"}, + {"name": "Flavour"}, + {"name": "Colour"}, + ] + with patch("src.extract_lambda.connect_to_database", return_value=mock_db): + mock_process_and_upload_tables = mocker.patch( + "src.extract_lambda.process_and_upload_tables", + return_value={ + "updated": ["Fruits"], + "no change": ["Vegetable", "Berry"], + }, + ) + mock_list_existing_s3_files = mocker.patch( + "src.extract_lambda.list_existing_s3_files", return_value={} + ) + event = {} + context = {} + response = lambda_handler(event, context) + assert response["statusCode"] == 200 + assert json.loads(response["body"]) == ( + "CSV files processed for Fruits and uploaded successfully." + "The following tables were not updated: Vegetable, Berry" + ) + mock_list_existing_s3_files.assert_called_once() + mock_process_and_upload_tables.assert_called_once_with(mock_db, {}) + mock_db.close.assert_called_once() + + def test_no_changes_detected_no_files_uploaded(self, mocker): + mock_db = MagicMock() + mock_db.run.side_effect = [ + [["Fruits"]], + [["Vegetable", "Sour", "Green"], ["Berry", "Sweet", "Red"]], + [["Food_type"], ["Flavour"], ["Colour"]], + ] + mock_db.columns.return_value = [ + {"name": "Food_type"}, + {"name": "Flavour"}, + {"name": "Colour"}, + ] + + with patch("src.extract_lambda.connect_to_database", return_value=mock_db): + mock_process_and_upload_tables = mocker.patch( + "src.extract_lambda.process_and_upload_tables", + return_value={"updated": [], "no change": ["Fruits"]}, + ) + mock_list_existing_s3_files = mocker.patch( + "src.extract_lambda.list_existing_s3_files", return_value={} + ) + event = {} + context = {} + response = lambda_handler(event, context) + assert response["statusCode"] == 200 + assert ( + json.loads(response["body"]) + == "No changes detected, no CSV files were uploaded." + ) + mock_list_existing_s3_files.assert_called_once() + mock_process_and_upload_tables.assert_called_once_with(mock_db, {}) + mock_db.close.assert_called_once() + + def test_exception_error(self, mocker): + with patch( + "src.extract_lambda.connect_to_database", + side_effect=Exception("Database connection error"), + ): + mock_process_and_upload_tables = mocker.patch( + "src.extract_lambda.process_and_upload_tables" + ) + mock_list_existing_s3_files = mocker.patch( + "src.extract_lambda.list_existing_s3_files" + ) + event = {} + context = {} + response = lambda_handler(event, context) + assert response["statusCode"] == 500 + assert json.loads(response["body"]) == "Internal server error." + mock_list_existing_s3_files.assert_not_called() + mock_process_and_upload_tables.assert_not_called() + + +class TestExtractBucket: + def test_extract_bucket_returns_bucket_name(self, s3_client, s3_mock_bucket): + result = extract_bucket(s3_client) + assert result == "extract_bucket" + + def test_bucket_returns_first_bucket(self, s3_client): + bucket1 = s3_client.create_bucket( + Bucket="bucket1", + CreateBucketConfiguration={"LocationConstraint": "eu-west-2"}, + ) + result = extract_bucket(s3_client) + assert result == "extract_bucket" + + def test_returns_index_error_if_no_buckets(self, s3_client): + s3_client.delete_bucket(Bucket="extract_bucket") + s3_client.delete_bucket(Bucket="bucket1") + + with pytest.raises(IndexError, match="list index out of range"): + extract_bucket(s3_client) + + +class TestListExistingS3Files: + def test_error_if_no_bucket(self, s3_client, caplog): + logger = logging.getLogger() + logger.info("Testing now.") + caplog.set_level(logging.ERROR) + list_existing_s3_files(client=s3_client) + assert "Error listing S3 objects" in caplog.text + + def test_error_if_bucket_is_empty(self, s3_client, caplog, s3_mock_bucket): + list_existing_s3_files("extract_bucket", client=s3_client) + assert "The bucket is empty" in caplog.text + + def test_retrieves_file_content(self, s3_client, caplog, s3_mock_bucket): + s3_client.upload_file("tests/dummy.txt", "extract_bucket", "dummy.txt") + result = list_existing_s3_files("extract_bucket", client=s3_client) + assert list(result.values()) == ["This is a test file."] + + +class TestConnectToDatabase: + def test_connect_to_database(mock_conn, mock_config): + with patch("src.extract_lambda.Connection", autospec=True) as mock_conn: + connect_to_database() + mock_conn.assert_called_with( + host="abc", user="def", port="5432", password="password", database="db" + ) + + def test_database_error(self, mock_config): # had mock_config in param + with pytest.raises(DBConnectionException): + connect_to_database() + + def test_logs_interface_error(self, caplog): + logger = logging.getLogger() + logger.info("Testing now.") + caplog.set_level(logging.ERROR) + with pytest.raises(DBConnectionException): + connect_to_database() + assert "Interface error" in caplog.text + + +class TestProcessAndUploadTables: + def test_error_process_and_upload_tables(self, mock_conn, s3_client, caplog): + caplog.set_level(logging.INFO) + + # Mock return values for database queries + queries = [ + "SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE';", + "SELECT * FROM Fruits WHERE last_updated > :latest;", + "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS where table_name = 'Fruits';", + ] + return_values = [ + [["Fruits"]], + [], # No new rows with a more recent last_updated timestamp + [["Food_type"], ["Flavour"], ["Colour"], ["last_updated"]], + ] + vals = dict(zip(queries, return_values)) + + # Patch the database connection and set return values for queries + with patch("src.extract_lambda.Connection") as mock_db: + mock_db().run.side_effect = return_values + s3_key = "Fruits/2024/08/15/Fruits_16:46:30.csv" + existing_files = { + s3_key: "Food_type,Flavour,Colour,last_updated\nVegetable,Sour,Green,2022-11-03 14:20:49.962\nBerry,Sweet,Red,2022-11-03 14:20:49.962" + } + + # Simulate S3 bucket and file setup + s3_client.create_bucket( + Bucket="test_extract_bucket", + CreateBucketConfiguration={"LocationConstraint": "eu-west-2"}, + ) + s3_client.upload_file( + "tests/dummy_identical.csv", "test_extract_bucket", s3_key + ) + + # Run the process_and_upload_tables function + process_and_upload_tables(mock_db(), existing_files, client=s3_client) + # Assert that the log contains "No new data" + assert "No new data" in caplog.text diff --git a/tests/test_secrets_manager.py b/tests/test_secrets_manager.py new file mode 100644 index 0000000..609c572 --- /dev/null +++ b/tests/test_secrets_manager.py @@ -0,0 +1,84 @@ +from src.secrets_manager import sm_client, retrieve_secrets +import boto3 +import botocore.exceptions +from moto import mock_aws +import json +import pytest +import os + + +@pytest.fixture(scope="function") +def aws_credentials(): + """Mocked AWS Credentials for moto.""" + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" + os.environ["AWS_SECURITY_TOKEN"] = "testing" + os.environ["AWS_SESSION_TOKEN"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = "eu-west-2" + + +@pytest.fixture(scope="function") +def mock_sm_client(aws_credentials): + with mock_aws(): + yield boto3.client("secretsmanager") + + +@pytest.fixture(scope="function") +def mock_store_secret(mock_sm_client): + secret = { + "cohort_id": "test_cohort_id", + "user": "test_user_id", + "password": "test_password", + "host": "test_host", + "database": "test_database", + "port": "test_port", + } + + secret_name = "test_secret" + + response = mock_sm_client.create_secret( + Name=secret_name, SecretString=json.dumps(secret) + ) + + return response + + +def test_retrieves_secrets_returns_dictionary(mock_sm_client, mock_store_secret): + secret_name = "test_secret" + + result = retrieve_secrets(mock_sm_client, secret_name) + + assert isinstance(result, dict) + + +def test_retrieves_secrets_returns_correct_keys_and_values( + mock_sm_client, mock_store_secret +): + secret_name = "test_secret" + + result = retrieve_secrets(mock_sm_client, secret_name) + + assert result["cohort_id"] == "test_cohort_id" + assert result["user"] == "test_user_id" + assert result["password"] == "test_password" + assert result["host"] == "test_host" + assert result["database"] == "test_database" + assert result["port"] == "test_port" + + +def test_retrieves_secrets_raises_error_if_secret_name_incorrect_data_type( + mock_sm_client, +): + secret_name = [1, 2, 3] + + with pytest.raises(botocore.exceptions.ParamValidationError) as error: + retrieve_secrets(mock_sm_client, secret_name) + + +def test_retrieves_secrets_raises_error_if_secret_name_does_not_exist( + mock_sm_client, mock_store_secret +): + secret_name = "test_secret_2" + + with pytest.raises(botocore.exceptions.ClientError) as error: + retrieve_secrets(mock_sm_client, secret_name) |
