diff --git a/.DS_Store b/.DS_Store index 57ae208..5b2875d 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/.gitignore b/.gitignore index f059112..bf79bb2 100644 --- a/.gitignore +++ b/.gitignore @@ -6,5 +6,6 @@ 01-basic-deployment/scripts/extract_done 01-basic-deployment/scripts/transform_done -# ignore the data files from 02-advanced-deployment -02-advanced-deployment/docker/data/* +# ignore drafts +01-basic-deployment-new/* +02-advanced-deployment-old/* diff --git a/02-advanced-deployment/.env b/02-advanced-deployment/.env new file mode 100644 index 0000000..2fd4ce7 --- /dev/null +++ b/02-advanced-deployment/.env @@ -0,0 +1,5 @@ +DB_NAME=weather_data +DB_USER=your_user +DB_PASSWORD=your_password +DB_HOST=postgres +DB_PORT=5432 diff --git a/02-advanced-deployment/Dockerfile b/02-advanced-deployment/Dockerfile new file mode 100644 index 0000000..4d5b4aa --- /dev/null +++ b/02-advanced-deployment/Dockerfile @@ -0,0 +1,29 @@ +# Base image +FROM python:3.9-slim AS base +WORKDIR /usr/src/app +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +# Extract stage +FROM base AS extract +WORKDIR /usr/src/app # Ensure the working directory is set +COPY scripts/extract.py ./ +COPY entrypoints/extract.sh ./ +RUN chmod +x extract.sh +ENTRYPOINT ["sh", "extract.sh"] + +# Load stage +FROM base AS load +WORKDIR /usr/src/app # Ensure the working directory is set +COPY scripts/load.py ./ +COPY entrypoints/load.sh ./ +RUN chmod +x load.sh +ENTRYPOINT ["sh", "load.sh"] + +# Transform stage +FROM base AS transform +WORKDIR /usr/src/app # Ensure the working directory is set +COPY scripts/transform.py ./ +COPY entrypoints/transform.sh ./ +RUN chmod +x transform.sh +ENTRYPOINT ["sh", "transform.sh"] diff --git a/02-advanced-deployment/docker-compose.yml b/02-advanced-deployment/docker-compose.yml new file mode 100644 index 0000000..0d1b711 --- /dev/null +++ b/02-advanced-deployment/docker-compose.yml @@ -0,0 +1,64 @@ +version: '3' +services: + postgres: + image: postgres:13 + environment: + POSTGRES_USER: ${DB_USER} + POSTGRES_PASSWORD: ${DB_PASSWORD} + POSTGRES_DB: ${DB_NAME} + ports: + - "5432:5432" + volumes: + - pgdata:/var/lib/postgresql/data + + extract: + build: + context: . + target: extract + environment: + - DB_NAME=${DB_NAME} + - DB_USER=${DB_USER} + - DB_PASSWORD=${DB_PASSWORD} + - DB_HOST=${DB_HOST} + - DB_PORT=${DB_PORT} + volumes: + - ./scripts:/usr/src/app + - shared-data:/data + depends_on: + - postgres + + load: + build: + context: . + target: load + environment: + - DB_NAME=${DB_NAME} + - DB_USER=${DB_USER} + - DB_PASSWORD=${DB_PASSWORD} + - DB_HOST=${DB_HOST} + - DB_PORT=${DB_PORT} + volumes: + - ./scripts:/usr/src/app + - shared-data:/data + depends_on: + - extract + + transform: + build: + context: . + target: transform + environment: + - DB_NAME=${DB_NAME} + - DB_USER=${DB_USER} + - DB_PASSWORD=${DB_PASSWORD} + - DB_HOST=${DB_HOST} + - DB_PORT=${DB_PORT} + volumes: + - ./scripts:/usr/src/app + - shared-data:/data + depends_on: + - load + +volumes: + pgdata: + shared-data: \ No newline at end of file diff --git a/02-advanced-deployment/docker/Dockerfile b/02-advanced-deployment/docker/Dockerfile deleted file mode 100644 index 6128e94..0000000 --- a/02-advanced-deployment/docker/Dockerfile +++ /dev/null @@ -1,21 +0,0 @@ -# Use an official Python runtime as a parent image -FROM python:3.9-slim - -# Set the working directory -WORKDIR /usr/src/app - -# Install required libraries -COPY requirements.txt ./ -RUN pip install --no-cache-dir -r requirements.txt - -# Copy the current directory contents into the container at /usr/src/app -COPY . . - -# Define environment variable -ENV OUTPUT_DIR=data - -# Make port 80 available to the world outside this container -EXPOSE 80 - -# Run script when the container launches -CMD ["python", "./extract_and_transform.py"] diff --git a/02-advanced-deployment/docker/docker-compose.yml b/02-advanced-deployment/docker/docker-compose.yml deleted file mode 100644 index cd969e0..0000000 --- a/02-advanced-deployment/docker/docker-compose.yml +++ /dev/null @@ -1,28 +0,0 @@ -version: '3' -services: - postgres: - image: postgres:13 - environment: - POSTGRES_USER: your_user - POSTGRES_PASSWORD: your_password - POSTGRES_DB: weather_data - ports: - - "5432:5432" - volumes: - - pgdata:/var/lib/postgresql/data - app: - build: . - depends_on: - - postgres - environment: - - DB_NAME=weather_data - - DB_USER=your_user - - DB_PASSWORD=your_password - - DB_HOST=postgres - - DB_PORT=5432 - volumes: - - .:/usr/src/app - command: python extract_and_transform.py - -volumes: - pgdata: diff --git a/02-advanced-deployment/docker/extract_and_transform.py b/02-advanced-deployment/docker/extract_and_transform.py deleted file mode 100644 index 5ebbb1b..0000000 --- a/02-advanced-deployment/docker/extract_and_transform.py +++ /dev/null @@ -1,87 +0,0 @@ -import boto3 -from botocore.config import Config -from botocore import UNSIGNED -import os -import pandas as pd -import glob -from sqlalchemy import create_engine, text - -def extract_noaa_gsod_data(year, month, output_dir='data'): - s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) - bucket_name = 'noaa-gsod-pds' - prefix = f'{year}/{str(month).zfill(2)}' - - if not os.path.exists(output_dir): - os.makedirs(output_dir) - - # List objects in the bucket for the specified month - response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) - - if 'Contents' not in response: - print(f"No files found for {prefix}") - return - - for obj in response['Contents']: - key = obj['Key'] - local_path = os.path.join(output_dir, os.path.basename(key)) - - # Download the file - s3.download_file(bucket_name, key, local_path) - print(f'Downloaded {key} to {local_path}') - -def transform_and_load_to_postgres(input_dir='data', db_name='weather_data'): - db_user = os.getenv('DB_USER', 'your_user') - db_password = os.getenv('DB_PASSWORD', 'your_password') - db_host = os.getenv('DB_HOST', 'postgres') # Ensure this is 'postgres' in Docker - db_port = os.getenv('DB_PORT', '5432') - - # Create SQLAlchemy engine - engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}') - - # Ensure the 'weather' table exists - with engine.connect() as conn: - conn.execute(text(''' - CREATE TABLE IF NOT EXISTS weather ( - station TEXT, - date TEXT, - latitude REAL, - longitude REAL, - elevation REAL, - name TEXT, - temp REAL, - temp_attributes TEXT, - dewp REAL, - dewp_attributes TEXT, - slp REAL, - slp_attributes TEXT, - stp REAL, - stp_attributes TEXT, - visib REAL, - visib_attributes TEXT, - wdsp REAL, - wdsp_attributes TEXT, - mxspd REAL, - gust REAL, - max REAL, - max_attributes TEXT, - min REAL, - min_attributes TEXT, - prcp REAL, - prcp_attributes TEXT, - sndp REAL, - frshtt TEXT - ) - ''')) - - # Process each CSV file in the input directory - for file_path in glob.glob(f'{input_dir}/*.csv'): - print(f'Processing {file_path}') - df = pd.read_csv(file_path) - - # Insert data into PostgreSQL table - df.to_sql('weather', engine, if_exists='append', index=False) - print(f'Loaded {file_path} into database') - -if __name__ == '__main__': - extract_noaa_gsod_data(2020, 1) - transform_and_load_to_postgres() \ No newline at end of file diff --git a/02-advanced-deployment/entrypoints/extract.sh b/02-advanced-deployment/entrypoints/extract.sh new file mode 100755 index 0000000..0ee7718 --- /dev/null +++ b/02-advanced-deployment/entrypoints/extract.sh @@ -0,0 +1,13 @@ +#!/bin/bash +set -e + +# Ensure /data directory exists +mkdir -p /data + +echo "Starting extract process..." +python /usr/src/app/extract.py + +echo "Extract process completed." + +# Signal successful completion +touch /data/extract_done diff --git a/02-advanced-deployment/entrypoints/load.sh b/02-advanced-deployment/entrypoints/load.sh new file mode 100755 index 0000000..db4e688 --- /dev/null +++ b/02-advanced-deployment/entrypoints/load.sh @@ -0,0 +1,17 @@ +#!/bin/bash +set -e + +# Ensure /data directory exists +mkdir -p /data + +# Wait for extract stage to complete +while [ ! -f /data/extract_done ]; do + echo "Waiting for extract stage to complete..." + sleep 5 +done + +# Run the load script +python /usr/src/app/load.py + +# Signal successful completion +touch /data/load_done diff --git a/02-advanced-deployment/entrypoints/transform.sh b/02-advanced-deployment/entrypoints/transform.sh new file mode 100755 index 0000000..94f53c3 --- /dev/null +++ b/02-advanced-deployment/entrypoints/transform.sh @@ -0,0 +1,17 @@ +#!/bin/bash +set -e + +# Ensure /data directory exists +mkdir -p /data + +# Wait for load stage to complete +while [ ! -f /data/load_done ]; do + echo "Waiting for load stage to complete..." + sleep 5 +done + +# Run the transform script +python /usr/src/app/transform.py + +# Signal successful completion +touch /data/transform_done diff --git a/02-advanced-deployment/docker/requirements.txt b/02-advanced-deployment/requirements.txt similarity index 100% rename from 02-advanced-deployment/docker/requirements.txt rename to 02-advanced-deployment/requirements.txt diff --git a/02-advanced-deployment/scripts/extract.py b/02-advanced-deployment/scripts/extract.py new file mode 100644 index 0000000..5117953 --- /dev/null +++ b/02-advanced-deployment/scripts/extract.py @@ -0,0 +1,32 @@ +# extract.py + +import boto3 +from botocore.config import Config +from botocore import UNSIGNED +import os + +def extract_noaa_gsod_data(year, month, output_dir='/data'): + s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) + bucket_name = 'noaa-gsod-pds' + prefix = f'{year}/{str(month).zfill(2)}' + + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + # List objects in the bucket for the specified month + response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) + + if 'Contents' not in response: + print(f"No files found for {prefix}") + return + + for obj in response['Contents']: + key = obj['Key'] + local_path = os.path.join(output_dir, os.path.basename(key)) + + # Download the file + s3.download_file(bucket_name, key, local_path) + print(f'Downloaded {key} to {local_path}') + +if __name__ == '__main__': + extract_noaa_gsod_data(2020, 1) diff --git a/02-advanced-deployment/scripts/load.py b/02-advanced-deployment/scripts/load.py new file mode 100644 index 0000000..f409722 --- /dev/null +++ b/02-advanced-deployment/scripts/load.py @@ -0,0 +1,74 @@ +# load.py + +import pandas as pd +import glob +import os +from sqlalchemy import create_engine, text + +def load_data_to_postgres(input_dir='/data', db_name='weather_data'): + db_user = os.getenv('DB_USER', 'your_user') + db_password = os.getenv('DB_PASSWORD', 'your_password') + db_host = os.getenv('DB_HOST', 'postgres') # Ensure this is 'postgres' in Docker + db_port = os.getenv('DB_PORT', '5432') + + # Create SQLAlchemy engine + engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}') + + try: + # Ensure the 'staging_weather' table exists + with engine.connect() as conn: + conn.execute(text(''' + CREATE TABLE IF NOT EXISTS staging_weather ( + station BIGINT, + date TEXT, + latitude DOUBLE PRECISION, + longitude DOUBLE PRECISION, + elevation DOUBLE PRECISION, + name TEXT, + temp DOUBLE PRECISION, + temp_attributes BIGINT, + dewp DOUBLE PRECISION, + dewp_attributes BIGINT, + slp DOUBLE PRECISION, + slp_attributes BIGINT, + stp DOUBLE PRECISION, + stp_attributes BIGINT, + visib DOUBLE PRECISION, + visib_attributes BIGINT, + wdsp DOUBLE PRECISION, + wdsp_attributes BIGINT, + mxspd DOUBLE PRECISION, + gust DOUBLE PRECISION, + max DOUBLE PRECISION, + max_attributes TEXT, + min DOUBLE PRECISION, + min_attributes TEXT, + prcp DOUBLE PRECISION, + prcp_attributes TEXT, + sndp DOUBLE PRECISION, + frshtt BIGINT + ) + ''')) + print('Ensured staging_weather table exists.') + + # Process each CSV file in the input directory + for file_path in glob.glob(f'{input_dir}/*.csv'): + print(f'Processing {file_path}') + df = pd.read_csv(file_path) + + # Insert data into PostgreSQL staging table + df.to_sql('staging_weather', engine, if_exists='append', index=False) + print(f'Loaded {file_path} into database') + + except Exception as e: + print(f"Error: {e}") + + finally: + # Ensure the connection is closed + engine.dispose() + + + print("Table created or already exists.") +if __name__ == '__main__': + load_data_to_postgres() + diff --git a/02-advanced-deployment/scripts/transform.py b/02-advanced-deployment/scripts/transform.py new file mode 100644 index 0000000..557d0d3 --- /dev/null +++ b/02-advanced-deployment/scripts/transform.py @@ -0,0 +1,64 @@ +# transform.py + +import pandas as pd +from sqlalchemy import create_engine, text +import os + +def transform_and_load_final(db_name='weather_data'): + db_user = os.getenv('DB_USER', 'your_user') + db_password = os.getenv('DB_PASSWORD', 'your_password') + db_host = os.getenv('DB_HOST', 'postgres') # Ensure this is 'postgres' in Docker + db_port = os.getenv('DB_PORT', '5432') + + # Create SQLAlchemy engine + engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}') + + try: + with engine.connect() as conn: + with conn.begin(): # Starts a transaction + conn.execute(text(''' + CREATE TABLE IF NOT EXISTS weather ( + "STATION" BIGINT, + "DATE" TEXT, + "LATITUDE" DOUBLE PRECISION, + "LONGITUDE" DOUBLE PRECISION, + "ELEVATION" DOUBLE PRECISION, + "NAME" TEXT, + "TEMP" DOUBLE PRECISION, + "DEWP" DOUBLE PRECISION, + "SLP" DOUBLE PRECISION, + "STP" DOUBLE PRECISION, + "VISIB" DOUBLE PRECISION, + "WDSP" DOUBLE PRECISION, + "MXSPD" DOUBLE PRECISION, + "GUST" DOUBLE PRECISION, + "MAX" DOUBLE PRECISION, + "MIN" DOUBLE PRECISION, + "PRCP" DOUBLE PRECISION, + "SNDP" DOUBLE PRECISION, + "FRSHTT" BIGINT + ) + ''')) + + # Transform data: load from staging and remove specified columns + conn.execute(text(''' + INSERT INTO "weather" ("STATION", "DATE", "LATITUDE", "LONGITUDE", "ELEVATION", "NAME", "TEMP", "DEWP", "SLP", "STP", "VISIB", "WDSP", "MXSPD", "GUST", "MAX", "MIN", "PRCP", "SNDP", "FRSHTT") + SELECT "STATION", "DATE", "LATITUDE", "LONGITUDE", "ELEVATION", "NAME", "TEMP", "DEWP", "SLP", "STP", "VISIB", "WDSP", "MXSPD", "GUST", "MAX", "MIN", "PRCP", "SNDP", "FRSHTT" + FROM "staging_weather" + ''')) + print('Data transformed and loaded into weather table.') + + # Commit the changes + conn.commit() + conn.close() + print('Data transformation and loading to final table completed.') + + except Exception as e: + print(f"Error: {e}") + + finally: + # Ensure the connection is closed + engine.dispose() + +if __name__ == '__main__': + transform_and_load_final()