Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Containerized data processing worklfow
Containerized data processing workflow from github copilot workshop. Will need to restructure directory in future commits.
tmanik committed Jul 30, 2024
1 parent ca2accc commit 0b1d9aa
Showing 14 changed files with 908 additions and 1 deletion.
Binary file added .DS_Store
Binary file not shown.
5 changes: 5 additions & 0 deletions .gitignore
@@ -0,0 +1,5 @@
scripts/data/combined_data_cleaned.csv
scripts/data/combined_data.csv

scripts/extract_done
scripts/transform_done
18 changes: 18 additions & 0 deletions Dockerfile
@@ -0,0 +1,18 @@
# Use the official Python base image
FROM python:3.9

# Set the working directory in the container
WORKDIR /app

# Copy the requirements file into the container
COPY requirements.txt /app/

# Install the required packages
RUN apt-get update && apt-get install -y libpq-dev
RUN pip install --no-cache-dir -r requirements.txt

# Copy the rest of your application code into the container
COPY . /app/

# Default command (can be overridden by docker-compose)
CMD ["sh", "entrypoint_extract.sh"]
3 changes: 2 additions & 1 deletion README.md
@@ -64,4 +64,5 @@ Container-Orchestration-for-Research-Workflows/
│ │ │ │ ├── components/
│ │ │ │ │ ├── VisualizationComponent.js
│ │ │ │ ├── public/
│ │ │ │ │ ├── index.html
│ │ │ │ │ ├── index.html
```
10 changes: 10 additions & 0 deletions app/entrypoint_extract.sh
@@ -0,0 +1,10 @@
#!/bin/bash
set -e

echo "Starting extract process..."
python /app/scripts/extract.py

echo "Extract process completed."

# Signal successful completion
touch /app/scripts/extract_done
11 changes: 11 additions & 0 deletions app/entrypoint_load.sh
@@ -0,0 +1,11 @@
#!/bin/sh
set -e

# Wait for transform stage to complete
while [ ! -f /app/scripts/transform_done ]; do
echo "Waiting for transform stage to complete..."
sleep 5
done

# Run the load script
python /app/scripts/load.py
14 changes: 14 additions & 0 deletions app/entrypoint_transform.sh
@@ -0,0 +1,14 @@
#!/bin/sh
set -e

# Wait for extract stage to complete
while [ ! -f /app/scripts/extract_done ]; do
echo "Waiting for extract stage to complete..."
sleep 5
done

# Run the transform script
python /app/scripts/transform.py

# Signal successful completion
touch /app/scripts/transform_done
41 changes: 41 additions & 0 deletions docker-compose.yml
@@ -0,0 +1,41 @@
services:
postgres:
image: postgres
container_name: postgres-container
environment:
POSTGRES_PASSWORD: mysecretpassword
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data

extract:
build: .
depends_on:
- postgres
environment:
DB_URL: "postgresql://postgres:mysecretpassword@postgres:5432/postgres"
volumes:
- .:/app
entrypoint: ["/app/app/entrypoint_extract.sh"]

transform:
build: .
depends_on:
- extract
volumes:
- .:/app
entrypoint: ["/app/app/entrypoint_transform.sh"]

load:
build: .
depends_on:
- transform
environment:
DB_URL: "postgresql://postgres:mysecretpassword@postgres:5432/postgres"
volumes:
- .:/app
entrypoint: ["/app/app/entrypoint_load.sh"]

volumes:
postgres_data:
18 changes: 18 additions & 0 deletions requirements.txt
@@ -0,0 +1,18 @@
# Data extraction and transformation
pandas==1.5.2
numpy==1.23.5
python-dotenv==0.20.0

# Database interaction (assuming Postgres)
psycopg2==2.9.5

# ORM (Object-Relational Mapping)
sqlalchemy==1.4.39

# AWS SDK
boto3==1.26.13

# Other utilities
click==8.1.3

Flask==2.3.2
Binary file added scripts/.DS_Store
Binary file not shown.
653 changes: 653 additions & 0 deletions scripts/data/fm-ad-notebook-visualization-COMPLETED.ipynb

Large diffs are not rendered by default.

48 changes: 48 additions & 0 deletions scripts/extract.py
@@ -0,0 +1,48 @@
import boto3
import pandas as pd
from botocore import UNSIGNED
from botocore.config import Config
from io import StringIO
import os

def download_tsv_from_s3(bucket_name, output_file):
# Initialize a session using Amazon S3
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))

# Get a list of all objects in the bucket
response = s3.list_objects_v2(Bucket=bucket_name)

# Initialize an empty list to hold the dataframes
dataframes = []

# Loop through each object in the bucket
for obj in response.get('Contents', []):
key = obj['Key']

# Check if the key represents a TSV file
if key.endswith('.tsv'):
# Fetch the file
file_obj = s3.get_object(Bucket=bucket_name, Key=key)
# Read the file content as string
file_content = file_obj['Body'].read().decode('utf-8')
# Convert the string to a DataFrame
df = pd.read_csv(StringIO(file_content), sep='\t')
# Append the dataframe to the list
dataframes.append(df)

# Concatenate all the dataframes in the list into a single dataframe
combined_df = pd.concat(dataframes, ignore_index=True)

# Save dataframe to a CSV file
combined_df.to_csv(output_file, index=False)
print(f"Combined data saved to {output_file}")

if __name__ == "__main__":
# Replace with your bucket name and desired output file path
bucket_name = 'gdc-fm-ad-phs001179-2-open'

# Define input and output file paths relative to the script location
script_dir = os.path.dirname(os.path.abspath(__file__))
output_file = os.path.join(script_dir, 'data', 'combined_data.csv')

download_tsv_from_s3(bucket_name, output_file)
26 changes: 26 additions & 0 deletions scripts/load.py
@@ -0,0 +1,26 @@
import pandas as pd
from sqlalchemy import create_engine
import os

def load_data_to_postgres(csv_file, db_url, table_name):
# Create a SQLAlchemy engine
engine = create_engine(db_url)

# Read the CSV file into a DataFrame
df = pd.read_csv(csv_file)

# Load data into PostgreSQL table
df.to_sql(table_name, engine, if_exists='replace', index=False)
print(f"Data loaded into {table_name} table")

if __name__ == "__main__":
# Define input and output file paths relative to the script location
script_dir = os.path.dirname(os.path.abspath(__file__))
csv_file = os.path.join(script_dir, 'data', 'combined_data_cleaned.csv')

# Define database connection details
db_url = 'postgresql://postgres:mysecretpassword@postgres:5432/postgres'
table_name = 'cleaned_data'

# Load data
load_data_to_postgres(csv_file, db_url, table_name)
62 changes: 62 additions & 0 deletions scripts/transform.py
@@ -0,0 +1,62 @@
import pandas as pd
import numpy as np
import os

def transform_data(input_file, output_file):
# Load the CSV file
df = pd.read_csv(input_file)

# Define columns to be removed
columns_to_remove = {
"cases.submitter_id",
"demographic.submitter_id",
"demographic_id",
"diagnoses.submitter_id",
"diagnosis_id",
"samples.submitter_id",
"sample_id",
"aliquots.submitter_id",
"aliquot_id",
"read_groups.submitter_id",
"read_group_id",
"read_groups.library_name",
"read_groups.name",
"read_groups.experiment_name"
}

# Remove specified columns
df.drop(columns=columns_to_remove, inplace=True)

# Replace 'Unknown' with NaN
df.replace('Unknown', np.nan, inplace=True)

# Remove duplicates
df.drop_duplicates(inplace=True)

# Group by 'case_id' and keep the first entry of each group
df = df.groupby('case_id').first().reset_index()

# Add a new column 'diagnoses.age_at_diagnosis_years'
df['diagnoses.age_at_diagnosis_years'] = df['diagnoses.age_at_diagnosis'] / 365

# Filter out rows where 'diagnoses.age_at_diagnosis_years' is 89 or older
df = df[df['diagnoses.age_at_diagnosis_years'] < 89]

# Floor the age at diagnosis to the nearest year and convert to int
df['diagnoses.age_at_diagnosis_years'] = df['diagnoses.age_at_diagnosis_years'].apply(np.floor).astype(int)

# Drop the original 'diagnoses.age_at_diagnosis' column
df.drop(columns=['diagnoses.age_at_diagnosis'], inplace=True)

# Save the cleaned dataframe to a new CSV file
df.to_csv(output_file, index=False)
print(f"Transformed data saved to {output_file}")

if __name__ == "__main__":
# Define input and output file paths relative to the script location
script_dir = os.path.dirname(os.path.abspath(__file__))
input_file = os.path.join(script_dir, 'data', 'combined_data.csv')
output_file = os.path.join(script_dir, 'data', 'combined_data_cleaned.csv')

# Run the transformation
transform_data(input_file, output_file)

0 comments on commit 0b1d9aa

Please sign in to comment.