Skip to content
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
class-container-curriculum-dev/03-data-processing/
class-container-curriculum-dev/03-data-processing/

Files

Permalink
Failed to load latest commit information.
Type
Name
Latest commit message
Commit time
September 9, 2024 14:59

03-data-processing

3.0 Data processing

In this section, we will implement the core data processing scripts for our ETL (Extract, Transform, Load) pipeline. We'll create Python scripts for each stage of the process and shell scripts to serve as entrypoints for our Docker containers.

By the end of this section, you will have:

  1. An extract.py script to download weather data from a public S3 bucket.
  2. A load.py script to insert the downloaded data into a PostgreSQL database.
  3. A transform.py script to process the data and prepare it for analysis.
  4. Shell scripts (extract.sh, load.sh, transform.sh) to orchestrate the execution of these Python scripts in our Docker containers.

These scripts will form the backbone of our data processing pipeline, allowing us to efficiently collect, store, and prepare weather data for analysis.

Data Set Description

This project utilizes the NOAA Global Surface Summary of the Day (GSOD) dataset, which is available through the AWS Open Data Registry. The GSOD dataset is derived from the Integrated Surface Hourly (ISH) dataset, containing global weather data from over 9,000 stations worldwide. It provides daily summaries of weather observations, including metrics like temperature, wind, and precipitation, and is updated 1-2 days after the original observation. The data collection starts from 1929 and is currently at Version 8, making it a comprehensive resource for analyzing long-term weather trends and patterns.

Prerequisites

Before starting this lesson, please ensure that you have:

  1. Completed the 02-containerized-environment lesson
  2. A basic understanding of Python programming
  3. Familiarity with SQL and database operations
  4. Basic knowledge of ETL processes

3.1 Creating folders and Python scripts

  1. You should see two folders within the data-pipeline folder named scripts and entrypoints.

  2. Navigate to the scripts folder.

  3. In the scripts folder, open up the file named extract.py.

  4. Copy the code below into extract.py and save it.

    import boto3
    from botocore.config import Config
    from botocore import UNSIGNED
    import os
    
    def extract_noaa_gsod_data(year, month, output_dir='/data'):
        s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
        bucket_name = 'noaa-gsod-pds'
        prefix = f'{year}/{str(month).zfill(2)}'
        
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        # List objects in the bucket for the specified month
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
        
        if 'Contents' not in response:
            print(f"No files found for {prefix}")
            return
    
        for obj in response['Contents']:
            key = obj['Key']
            local_path = os.path.join(output_dir, os.path.basename(key))
            
            # Download the file
            s3.download_file(bucket_name, key, local_path)
            print(f'Downloaded {key} to {local_path}')
    
    if __name__ == '__main__':
        extract_noaa_gsod_data(2020, 1)

    This Python script defines a function to extract NOAA GSOD (Global Surface Summary of the Day) data from an AWS S3 bucket. It uses the boto3 library to interact with S3, downloads weather data files for a specified year and month, and saves them to a local directory. The script demonstrates how to access public datasets stored in cloud services and handle file downloads programmatically.

    Note: This extract script can be customized to work with your specific data source. Whether you're pulling data from a different cloud storage service, an API, or a local file system, you can modify this script to suit your needs while maintaining the same overall structure.

  5. Now open up the file named load.py.

  6. Copy the code below into load.py and save it.

    import pandas as pd
    import glob
    import os
    from sqlalchemy import create_engine, text
    
    def load_data_to_postgres(input_dir='/data'):
        db_name = os.getenv('DB_NAME')
        db_user = os.getenv('DB_USER')
        db_password = os.getenv('DB_PASSWORD')
        db_host = os.getenv('DB_HOST')  # Ensure this is 'postgres' in Docker
        db_port = os.getenv('DB_PORT')
        
        # Create SQLAlchemy engine
        engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')
    
        try:
            # Ensure the 'staging_weather' table exists
            with engine.connect() as conn:
                conn.execute(text('''
                    CREATE TABLE IF NOT EXISTS staging_weather (
                        station BIGINT,
                        date TEXT,
                        latitude DOUBLE PRECISION,
                        longitude DOUBLE PRECISION,
                        elevation DOUBLE PRECISION,
                        name TEXT,
                        temp DOUBLE PRECISION,
                        temp_attributes BIGINT,
                        dewp DOUBLE PRECISION,
                        dewp_attributes BIGINT,
                        slp DOUBLE PRECISION,
                        slp_attributes BIGINT,
                        stp DOUBLE PRECISION,
                        stp_attributes BIGINT,
                        visib DOUBLE PRECISION,
                        visib_attributes BIGINT,
                        wdsp DOUBLE PRECISION,
                        wdsp_attributes BIGINT,
                        mxspd DOUBLE PRECISION,
                        gust DOUBLE PRECISION,
                        max DOUBLE PRECISION,
                        max_attributes TEXT,
                        min DOUBLE PRECISION,
                        min_attributes TEXT,
                        prcp DOUBLE PRECISION,
                        prcp_attributes TEXT,
                        sndp DOUBLE PRECISION,
                        frshtt BIGINT
                    )
                    '''))
                print('Ensured staging_weather table exists.')
    
            # Process each CSV file in the input directory
            for file_path in glob.glob(f'{input_dir}/*.csv'):
                print(f'Processing {file_path}')
                df = pd.read_csv(file_path)
                
                # Insert data into PostgreSQL staging table
                df.to_sql('staging_weather', engine, if_exists='append', index=False)
                print(f'Loaded {file_path} into database')
        
        except Exception as e:
            print(f"Error: {e}")
        
        finally:
            # Ensure the connection is closed
            engine.dispose()
    
    
        print("Table created or already exists.")
    if __name__ == '__main__':
        load_data_to_postgres()

    This script handles the loading of extracted weather data into a PostgreSQL database. It creates a staging table if it doesn't exist, then reads each CSV file from the input directory and loads its contents into the staging table. The script uses SQLAlchemy for database operations and pandas for data manipulation, demonstrating how to handle database connections, execute SQL commands, and load data efficiently in a containerized environment.

    Note: The load and transform scripts can be adapted to work with your specific database and data processing requirements. You might need to modify the database connection details, table structures, or transformation logic to align with your research workflow.

  7. Finally, open up the file named transform.py.

  8. Copy the code below into transform.py and save it.

    import pandas as pd
    from sqlalchemy import create_engine, text
    import os
    
    def transform_and_load_final():
        db_name = os.getenv('DB_NAME')
        db_user = os.getenv('DB_USER')
        db_password = os.getenv('DB_PASSWORD')
        db_host = os.getenv('DB_HOST')  # Ensure this is 'postgres' in Docker
        db_port = os.getenv('DB_PORT')
        
        # Create SQLAlchemy engine
        engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')
        
        try:
            with engine.connect() as conn:
                with conn.begin():  # Starts a transaction
                    conn.execute(text('''
                    CREATE TABLE IF NOT EXISTS weather (
                        "STATION" BIGINT,
                        "DATE" TEXT,
                        "LATITUDE" DOUBLE PRECISION,
                        "LONGITUDE" DOUBLE PRECISION,
                        "ELEVATION" DOUBLE PRECISION,
                        "NAME" TEXT,
                        "TEMP" DOUBLE PRECISION,
                        "DEWP" DOUBLE PRECISION,
                        "SLP" DOUBLE PRECISION,
                        "STP" DOUBLE PRECISION,
                        "VISIB" DOUBLE PRECISION,
                        "WDSP" DOUBLE PRECISION,
                        "MXSPD" DOUBLE PRECISION,
                        "GUST" DOUBLE PRECISION,
                        "MAX" DOUBLE PRECISION,
                        "MIN" DOUBLE PRECISION,
                        "PRCP" DOUBLE PRECISION,
                        "SNDP" DOUBLE PRECISION,
                        "FRSHTT" BIGINT
                    )
                '''))
                
                # Transform data: load from staging and remove specified columns
                conn.execute(text('''
                    INSERT INTO "weather" ("STATION", "DATE", "LATITUDE", "LONGITUDE", "ELEVATION", "NAME", "TEMP", "DEWP", "SLP", "STP", "VISIB", "WDSP", "MXSPD", "GUST", "MAX", "MIN", "PRCP", "SNDP", "FRSHTT")
                    SELECT "STATION", "DATE", "LATITUDE", "LONGITUDE", "ELEVATION", "NAME", "TEMP", "DEWP", "SLP", "STP", "VISIB", "WDSP", "MXSPD", "GUST", "MAX", "MIN", "PRCP", "SNDP", "FRSHTT"
                    FROM "staging_weather"
                '''))
                print('Data transformed and loaded into weather table.')
            
                    # Commit the changes
                conn.commit()
                conn.close()
            print('Data transformation and loading to final table completed.')
    
        except Exception as e:
            print(f"Error: {e}")
        
        finally:
            # Ensure the connection is closed
            engine.dispose()
    
    if __name__ == '__main__':
        transform_and_load_final()

    This script performs the transformation step of the ETL process. It creates a final 'weather' table and populates it with selected columns from the staging table, effectively transforming the data structure. The script demonstrates how to use SQLAlchemy for executing SQL commands within a transaction, ensuring data integrity during the transformation process.

3.2 Entrypoint scripts

The entrypoint scripts (extract.sh, load.sh, and transform.sh) are used to simplify running the containers in sequence. They ensure that each stage of the ETL process runs in the correct order and only when the previous stage has completed successfully. This simple orchestration is suitable for our workshop and small-scale projects.

However, it's important to note that in production settings or for more complex workflows, it's better to use more robust orchestration tools like Apache Airflow or Argo Workflows. These tools provide advanced features such as dependency management, error handling, and monitoring that are crucial for large-scale data pipelines. While exploring these tools is beyond the scope of this workshop, they are worth considering for more advanced data processing needs.

  1. Navigate to the entrypoints folder.

  2. In the entrypoints folder, open up the file named extract.sh.

  3. Copy the code below to extract.sh and save it.

    #!/bin/bash
    set -e
    
    # Ensure /data directory exists
    mkdir -p /data
    
    echo "Starting extract process..."
    python /usr/src/app/extract.py
    
    echo "Extract process completed."
    
    # Signal successful completion
    touch /data/extract_done

    This shell script serves as the entrypoint for the extract stage of the ETL process. It ensures the data directory exists, runs the Python extract script, and signals completion by creating a flag file. This script demonstrates how to set up a containerized process that can be easily orchestrated and monitored.

  4. Now open up the file named load.sh.

  5. Copy the code below to load.sh and save it.

    #!/bin/bash
    set -e
    
    # Ensure /data directory exists
    mkdir -p /data
    
    # Wait for extract stage to complete
    while [ ! -f /data/extract_done ]; do
    echo "Waiting for extract stage to complete..."
    sleep 5
    done
    
    # Run the load script
    python /usr/src/app/load.py
    
    # Signal successful completion
    touch /data/load_done

    This shell script is the entrypoint for the load stage. It waits for the extract stage to complete by checking for a flag file, then runs the Python load script. This script showcases how to implement simple dependencies between containerized processes, ensuring that stages run in the correct order.

  6. In the entrypoints folder, open up the file named transform.sh.

  7. Copy the code below to transform.sh and save it.

    #!/bin/bash
    set -e
    
    # Ensure /data directory exists
    mkdir -p /data
    
    # Wait for load stage to complete
    while [ ! -f /data/load_done ]; do
    echo "Waiting for load stage to complete..."
    sleep 5
    done
    
    # Run the transform script
    python /usr/src/app/transform.py
    
    # Signal successful completion
    touch /data/transform_done

    This shell script is the entrypoint for the transform stage. Similar to the load script, it waits for the previous stage (load) to complete before running the Python transform script. This script further demonstrates the implementation of a simple orchestration system for a multi-stage data pipeline in a containerized environment.

3.3 Run the data pipeline

  1. Open up your terminal and navigate to the data-pipeline folder by running:

    cd data-pipeline
  2. Then run the following command to build and deploy the ELT stack.

    docker-compose up

    This command uses Docker Compose to build and start all the services defined in your docker-compose.yml file. It will create and run containers for each stage of your ETL process (extract, load, transform) as well as the PostgreSQL database, orchestrating the entire data pipeline. This demonstrates how containerization simplifies the deployment and execution of complex, multi-stage data processing workflows.

    If working properly, you should see your terminal generating outputs like the GIF below.

Conclusion

In this lesson, you learned how to implement the Extract, Transform, and Load (ETL) processes within our containerized environment. You created Python scripts for extracting data from an external source, loading it into a PostgreSQL database, and transforming it. You also set up shell scripts to run these Python scripts in the correct order within Docker containers.

These scripts form the backbone of our data processing pipeline, allowing us to efficiently collect, store, and prepare weather data for analysis. The containerized approach ensures consistency across different environments and simplifies the deployment process.

It's important to remember that while we've used specific data sources and processing steps in this workshop, the scripts and workflow we've created are highly customizable. You can adapt the extract script to pull data from your own sources, modify the load script to work with your preferred database system, and customize the transform script to perform the specific data processing tasks required for your research.

Key Points

  • The extract, load, and transform scripts can be customized to work with different data sources, databases, and processing requirements
  • The extract script connects to an S3 bucket to download weather data, but can be modified to pull data from other sources
  • The load script reads CSV files and loads them into a staging table in PostgreSQL, but can be adapted for other file formats and database systems
  • The transform script processes data from the staging table and loads it into a final table, and can be customized for specific data transformation needs
  • Entrypoint shell scripts ensure proper sequencing of the ETL processes in the Docker environment
  • While our simple shell script orchestration works for this workshop, production environments often use more robust tools like Airflow or Argo for complex workflows
  • Docker Compose orchestrates the entire data pipeline, simplifying deployment and execution

Further Reading