Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Added ELT pipeline implementation
01-basic-deployment is an ETL pipeline.

02-advanced-deployment is an ELT pipeline. Its purpose is to handle frequently updated data sources.
tmanik committed Aug 7, 2024
1 parent 13639cb commit 1fbc666
Showing 15 changed files with 318 additions and 138 deletions.
Binary file modified .DS_Store
Binary file not shown.
5 changes: 3 additions & 2 deletions .gitignore
@@ -6,5 +6,6 @@
01-basic-deployment/scripts/extract_done
01-basic-deployment/scripts/transform_done

# ignore the data files from 02-advanced-deployment
02-advanced-deployment/docker/data/*
# ignore drafts
01-basic-deployment-new/*
02-advanced-deployment-old/*
5 changes: 5 additions & 0 deletions 02-advanced-deployment/.env
@@ -0,0 +1,5 @@
DB_NAME=weather_data
DB_USER=your_user
DB_PASSWORD=your_password
DB_HOST=postgres
DB_PORT=5432
29 changes: 29 additions & 0 deletions 02-advanced-deployment/Dockerfile
@@ -0,0 +1,29 @@
# Base image
FROM python:3.9-slim AS base
WORKDIR /usr/src/app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

# Extract stage
FROM base AS extract
WORKDIR /usr/src/app # Ensure the working directory is set
COPY scripts/extract.py ./
COPY entrypoints/extract.sh ./
RUN chmod +x extract.sh
ENTRYPOINT ["sh", "extract.sh"]

# Load stage
FROM base AS load
WORKDIR /usr/src/app # Ensure the working directory is set
COPY scripts/load.py ./
COPY entrypoints/load.sh ./
RUN chmod +x load.sh
ENTRYPOINT ["sh", "load.sh"]

# Transform stage
FROM base AS transform
WORKDIR /usr/src/app # Ensure the working directory is set
COPY scripts/transform.py ./
COPY entrypoints/transform.sh ./
RUN chmod +x transform.sh
ENTRYPOINT ["sh", "transform.sh"]
64 changes: 64 additions & 0 deletions 02-advanced-deployment/docker-compose.yml
@@ -0,0 +1,64 @@
version: '3'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: ${DB_USER}
POSTGRES_PASSWORD: ${DB_PASSWORD}
POSTGRES_DB: ${DB_NAME}
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data

extract:
build:
context: .
target: extract
environment:
- DB_NAME=${DB_NAME}
- DB_USER=${DB_USER}
- DB_PASSWORD=${DB_PASSWORD}
- DB_HOST=${DB_HOST}
- DB_PORT=${DB_PORT}
volumes:
- ./scripts:/usr/src/app
- shared-data:/data
depends_on:
- postgres

load:
build:
context: .
target: load
environment:
- DB_NAME=${DB_NAME}
- DB_USER=${DB_USER}
- DB_PASSWORD=${DB_PASSWORD}
- DB_HOST=${DB_HOST}
- DB_PORT=${DB_PORT}
volumes:
- ./scripts:/usr/src/app
- shared-data:/data
depends_on:
- extract

transform:
build:
context: .
target: transform
environment:
- DB_NAME=${DB_NAME}
- DB_USER=${DB_USER}
- DB_PASSWORD=${DB_PASSWORD}
- DB_HOST=${DB_HOST}
- DB_PORT=${DB_PORT}
volumes:
- ./scripts:/usr/src/app
- shared-data:/data
depends_on:
- load

volumes:
pgdata:
shared-data:
21 changes: 0 additions & 21 deletions 02-advanced-deployment/docker/Dockerfile

This file was deleted.

28 changes: 0 additions & 28 deletions 02-advanced-deployment/docker/docker-compose.yml

This file was deleted.

87 changes: 0 additions & 87 deletions 02-advanced-deployment/docker/extract_and_transform.py

This file was deleted.

13 changes: 13 additions & 0 deletions 02-advanced-deployment/entrypoints/extract.sh
@@ -0,0 +1,13 @@
#!/bin/bash
set -e

# Ensure /data directory exists
mkdir -p /data

echo "Starting extract process..."
python /usr/src/app/extract.py

echo "Extract process completed."

# Signal successful completion
touch /data/extract_done
17 changes: 17 additions & 0 deletions 02-advanced-deployment/entrypoints/load.sh
@@ -0,0 +1,17 @@
#!/bin/bash
set -e

# Ensure /data directory exists
mkdir -p /data

# Wait for extract stage to complete
while [ ! -f /data/extract_done ]; do
echo "Waiting for extract stage to complete..."
sleep 5
done

# Run the load script
python /usr/src/app/load.py

# Signal successful completion
touch /data/load_done
17 changes: 17 additions & 0 deletions 02-advanced-deployment/entrypoints/transform.sh
@@ -0,0 +1,17 @@
#!/bin/bash
set -e

# Ensure /data directory exists
mkdir -p /data

# Wait for load stage to complete
while [ ! -f /data/load_done ]; do
echo "Waiting for load stage to complete..."
sleep 5
done

# Run the transform script
python /usr/src/app/transform.py

# Signal successful completion
touch /data/transform_done
File renamed without changes.
32 changes: 32 additions & 0 deletions 02-advanced-deployment/scripts/extract.py
@@ -0,0 +1,32 @@
# extract.py

import boto3
from botocore.config import Config
from botocore import UNSIGNED
import os

def extract_noaa_gsod_data(year, month, output_dir='/data'):
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
bucket_name = 'noaa-gsod-pds'
prefix = f'{year}/{str(month).zfill(2)}'

if not os.path.exists(output_dir):
os.makedirs(output_dir)

# List objects in the bucket for the specified month
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

if 'Contents' not in response:
print(f"No files found for {prefix}")
return

for obj in response['Contents']:
key = obj['Key']
local_path = os.path.join(output_dir, os.path.basename(key))

# Download the file
s3.download_file(bucket_name, key, local_path)
print(f'Downloaded {key} to {local_path}')

if __name__ == '__main__':
extract_noaa_gsod_data(2020, 1)
74 changes: 74 additions & 0 deletions 02-advanced-deployment/scripts/load.py
@@ -0,0 +1,74 @@
# load.py

import pandas as pd
import glob
import os
from sqlalchemy import create_engine, text

def load_data_to_postgres(input_dir='/data', db_name='weather_data'):
db_user = os.getenv('DB_USER', 'your_user')
db_password = os.getenv('DB_PASSWORD', 'your_password')
db_host = os.getenv('DB_HOST', 'postgres') # Ensure this is 'postgres' in Docker
db_port = os.getenv('DB_PORT', '5432')

# Create SQLAlchemy engine
engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')

try:
# Ensure the 'staging_weather' table exists
with engine.connect() as conn:
conn.execute(text('''
CREATE TABLE IF NOT EXISTS staging_weather (
station BIGINT,
date TEXT,
latitude DOUBLE PRECISION,
longitude DOUBLE PRECISION,
elevation DOUBLE PRECISION,
name TEXT,
temp DOUBLE PRECISION,
temp_attributes BIGINT,
dewp DOUBLE PRECISION,
dewp_attributes BIGINT,
slp DOUBLE PRECISION,
slp_attributes BIGINT,
stp DOUBLE PRECISION,
stp_attributes BIGINT,
visib DOUBLE PRECISION,
visib_attributes BIGINT,
wdsp DOUBLE PRECISION,
wdsp_attributes BIGINT,
mxspd DOUBLE PRECISION,
gust DOUBLE PRECISION,
max DOUBLE PRECISION,
max_attributes TEXT,
min DOUBLE PRECISION,
min_attributes TEXT,
prcp DOUBLE PRECISION,
prcp_attributes TEXT,
sndp DOUBLE PRECISION,
frshtt BIGINT
)
'''))
print('Ensured staging_weather table exists.')

# Process each CSV file in the input directory
for file_path in glob.glob(f'{input_dir}/*.csv'):
print(f'Processing {file_path}')
df = pd.read_csv(file_path)

# Insert data into PostgreSQL staging table
df.to_sql('staging_weather', engine, if_exists='append', index=False)
print(f'Loaded {file_path} into database')

except Exception as e:
print(f"Error: {e}")

finally:
# Ensure the connection is closed
engine.dispose()


print("Table created or already exists.")
if __name__ == '__main__':
load_data_to_postgres()

64 changes: 64 additions & 0 deletions 02-advanced-deployment/scripts/transform.py
@@ -0,0 +1,64 @@
# transform.py

import pandas as pd
from sqlalchemy import create_engine, text
import os

def transform_and_load_final(db_name='weather_data'):
db_user = os.getenv('DB_USER', 'your_user')
db_password = os.getenv('DB_PASSWORD', 'your_password')
db_host = os.getenv('DB_HOST', 'postgres') # Ensure this is 'postgres' in Docker
db_port = os.getenv('DB_PORT', '5432')

# Create SQLAlchemy engine
engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')

try:
with engine.connect() as conn:
with conn.begin(): # Starts a transaction
conn.execute(text('''
CREATE TABLE IF NOT EXISTS weather (
"STATION" BIGINT,
"DATE" TEXT,
"LATITUDE" DOUBLE PRECISION,
"LONGITUDE" DOUBLE PRECISION,
"ELEVATION" DOUBLE PRECISION,
"NAME" TEXT,
"TEMP" DOUBLE PRECISION,
"DEWP" DOUBLE PRECISION,
"SLP" DOUBLE PRECISION,
"STP" DOUBLE PRECISION,
"VISIB" DOUBLE PRECISION,
"WDSP" DOUBLE PRECISION,
"MXSPD" DOUBLE PRECISION,
"GUST" DOUBLE PRECISION,
"MAX" DOUBLE PRECISION,
"MIN" DOUBLE PRECISION,
"PRCP" DOUBLE PRECISION,
"SNDP" DOUBLE PRECISION,
"FRSHTT" BIGINT
)
'''))

# Transform data: load from staging and remove specified columns
conn.execute(text('''
INSERT INTO "weather" ("STATION", "DATE", "LATITUDE", "LONGITUDE", "ELEVATION", "NAME", "TEMP", "DEWP", "SLP", "STP", "VISIB", "WDSP", "MXSPD", "GUST", "MAX", "MIN", "PRCP", "SNDP", "FRSHTT")
SELECT "STATION", "DATE", "LATITUDE", "LONGITUDE", "ELEVATION", "NAME", "TEMP", "DEWP", "SLP", "STP", "VISIB", "WDSP", "MXSPD", "GUST", "MAX", "MIN", "PRCP", "SNDP", "FRSHTT"
FROM "staging_weather"
'''))
print('Data transformed and loaded into weather table.')

# Commit the changes
conn.commit()
conn.close()
print('Data transformation and loading to final table completed.')

except Exception as e:
print(f"Error: {e}")

finally:
# Ensure the connection is closed
engine.dispose()

if __name__ == '__main__':
transform_and_load_final()

0 comments on commit 1fbc666

Please sign in to comment.