Building ETL Job: Transferring Data from MySQL to Redshift using Python

 · 

14 min read

notion-image
notion image
 
Extract, Transform, Load (ETL) is a data pipeline process that involves extracting data from a source system, transforming it in some way, and then loading it into a target system. In this article, we’ll demonstrate how to build an ETL job that extracts data from a MySQL database and loads it into a Redshift data warehouse. We’ll also implement the Change Data Capture (CDC) concept to capture delta changes and trigger this ETL job every hour.

Using Normal Python Script

Prerequisites

  • Python 3 installed on your local machine
  • MySQL and AWS Redshift instances up and running
  • mysql-connector-python and psycopg2 Python libraries installed
  • An orders table in your MySQL database with create_date and update_date columns

Step 1: Extract Data from MySQL

First, we will extract the data from the MySQL database using the mysql-connector-python library. Here's a simple Python function that connects to a MySQL database and fetches new records from the orders table:
Python
import mysql.connector
from datetime import datetime

# set last_update to the start of epoch
last_update = datetime(1970, 1, 1)

def extract_new_records():
    global last_update
    connection = mysql.connector.connect(user='mysql_user', password='mysql_password',
                                        host='mysql_host', database='mysql_database')

    cursor = connection.cursor()
    query = f"""SELECT * FROM orders
                WHERE update_date > '{last_update.strftime("%Y-%m-%d %H:%M:%S")}'"""
    cursor.execute(query)

    records = cursor.fetchall()
    last_update = datetime.now()
    cursor.close()
    connection.close()
    return records

Step 2: Load Data to Redshift

The next step is to load the extracted data into Redshift. We’ll use the psycopg2 library to insert the records into the Redshift table:
Python
import psycopg2

def load_to_redshift(records):
    connection = psycopg2.connect(user='redshift_user', password='redshift_password',
                                  host='redshift_host', port='redshift_port', database='redshift_database')

    cursor = connection.cursor()
    for record in records:
        insert_query = "INSERT INTO orders VALUES " + str(record)
        cursor.execute(insert_query)
    connection.commit()
    cursor.close()
    connection.close()

Step 3: Schedule the ETL Job

Now that we have our extract and load functions, we need to schedule our ETL job to run every hour. We can use the schedule library for this:
Python
import schedule
import time

def etl_job():
    records = extract_new_records()
    load_to_redshift(records)

# Schedule the job every hour
schedule.every(1).hours.do(etl_job)

# Keep the script running
while True:
    schedule.run_pending()
    time.sleep(1)

Local Development Steps

  1. Install required libraries: Run pip install mysql-connector-python psycopg2-binary schedule in your terminal.
  1. Save the above code in a Python file, e.g., etl_job.py.
  1. Update the MySQL and Redshift connection details in the extract_new_records and load_to_redshift functions.
  1. Run the script: Run python etl_job.py in your terminal to start the ETL job.

Using Airflow

Apache Airflow is a powerful platform used to programmatically author, schedule, and monitor workflows.
Here’s how you might set up the ETL job using Airflow.

Prerequisites

  • Apache Airflow is installed on your local machine.
  • MySQL and AWS Redshift instances are up and running.
  • mysql-connector-python and psycopg2 Python libraries installed.
  • An orders table in your MySQL database with create_date and update_date columns.

Step 1: Define the Extraction & Loading Function

First, we will define the extraction and loading functions in Python.
Python
import mysql.connector
import psycopg2
from datetime import datetime
from airflow.models import Variable

# Fetch the last_update from Airflow Variable, if not exists, set to the start of epoch
last_update = Variable.get("last_update", default_var=datetime(1970, 1, 1))

def extract_new_records():
    global last_update
    connection = mysql.connector.connect(user='mysql_user', password='mysql_password',
                                        host='mysql_host', database='mysql_database')

    cursor = connection.cursor()
    query = f"""SELECT * FROM orders
                WHERE update_date > '{last_update.strftime("%Y-%m-%d %H:%M:%S")}'"""
    cursor.execute(query)

    records = cursor.fetchall()
    last_update = datetime.now()
    # Store the last_update back to Airflow Variable
    Variable.set("last_update", last_update)
    cursor.close()
    connection.close()
    return records

def load_to_redshift(records):
    connection = psycopg2.connect(user='redshift_user', password='redshift_password',
                                  host='redshift_host', port='redshift_port', database='redshift_database')

    cursor = connection.cursor()
    for record in records:
        insert_query = "INSERT INTO orders VALUES " + str(record)
        cursor.execute(insert_query)
    connection.commit()
    cursor.close()
    connection.close()

Step 2: Define Airflow DAG and Tasks

Create a new DAG in your Airflow DAG directory (typically $AIRFLOW_HOME/dags). This example assumes that file is named mysql_to_redshift.py.
Python
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from etl_functions import extract_new_records, load_to_redshift  # assuming the above functions are in etl_functions.py file

# Define the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'mysql_to_redshift_cdc',
    default_args=default_args,
    description='A simple ETL job',
    schedule_interval=timedelta(hours=1),
)

# Define the tasks
extract_task = PythonOperator(
    task_id='extract_from_mysql',
    python_callable=extract_new_records,
    dag=dag,
)

load_task = PythonOperator(
    task_id='load_to_redshift',
    python_callable=load_to_redshift,
    dag=dag,
)

# Set the task dependencies
extract_task >> load_task
Note: In Apache Airflow, you can pass data between tasks using XComs (cross-communication), which is a mechanism that lets tasks exchange messages or small amounts of data. The data is usually stored in the Airflow metadata database and can be fetched by dependent tasks.
Here’s a modified version of your code that utilizes XComs:
Python
# Step 1: Extract Data from MySQL and load to Redshift
# ... (unchanged import statements)

def extract_new_records(**kwargs):
    global last_update
    # ... (same as before)
    # Store the last_update back to Airflow Variable
    Variable.set("last_update", last_update)

    # Push records to XCom
    kwargs['ti'].xcom_push(key='extracted_records', value=records)

def load_to_redshift(**kwargs):
    # Fetch records from XCom
    ti = kwargs['ti']
    records = ti.xcom_pull(task_ids='extract_from_mysql', key='extracted_records')

    # ... (same as before)

# Step 2: Define Airflow DAG and Tasks
# ... (unchanged import statements)

# Define the DAG
# ... (same as before)

# Define the tasks
extract_task = PythonOperator(
    task_id='extract_from_mysql',
    python_callable=extract_new_records,
    provide_context=True,  # Important: this makes the Airflow context available in your function
    dag=dag,
)

load_task = PythonOperator(
    task_id='load_to_redshift',
    python_callable=load_to_redshift,
    provide_context=True,  # Important: this makes the Airflow context available in your function
    dag=dag,
)

# Set the task dependencies
extract_task >> load_task
In this updated code:
  1. extract_new_records() now pushes the records to XCom after fetching them.
  1. load_to_redshift() pulls the records from XCom before inserting them into the Redshift database.
  1. Both tasks have provide_context=True set, so you can use kwargs['ti'] to access the task instance and, consequently, use XComs.

Local Development Steps

  1. Install required libraries: Run pip install apache-airflow mysql-connector-python psycopg2-binary in your terminal.
  1. Save the above Python functions in a file, e.g., etl_functions.py.
  1. Update the MySQL and Redshift connection details in the extract_new_records and load_to_redshift functions.
  1. Save the DAG file in your Airflow DAGs directory.
  1. Start the Airflow webserver and scheduler: Run airflow webserver and airflow scheduler in your terminal.
  1. Open Airflow UI in your web browser (usually localhost:8080), you should see your DAG mysql_to_redshift_cdc. Trigger the DAG and monitor the execution.
below are the steps to set up a Docker environment for running the Airflow ETL job locally.

Step 1: Create Dockerfile

First, let’s create a Dockerfile to set up the Python environment and install the necessary dependencies.
Python
# Use an official Python runtime as a parent image
FROM python:3.8-slim-buster

# Set the working directory
WORKDIR /usr/src/app

# Install Airflow and other dependencies
RUN pip install --no-cache-dir apache-airflow mysql-connector-python psycopg2-binary

# Copy the current directory contents into the container at /usr/src/app
COPY . .

# Expose default airflow port
EXPOSE 8080

# Start the airflow webserver and scheduler in the background
CMD airflow db init && \
    airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email [email protected] && \
    airflow webserver -D && \
    airflow scheduler -D && \
    /bin/bash

Step 2: Create docker-compose.yml file

Next, we’ll use Docker Compose to spin up our Airflow service. Docker Compose allows us to define and run multi-container Docker applications. Create a docker-compose.yml file in the same directory as your Dockerfile:
Plain Text
version: '3'
services:
  airflow:
    build: .
    volumes:
      - ./dags:/usr/src/app/dags
    ports:
      - "8080:8080"
This Compose file defines an Airflow service built from the current directory’s Dockerfile, maps the local dags directory to the dags directory in the container, and forwards the container's port 8080 to port 8080 on the host machine.

Step 3: Build and Run the Docker Container

Finally, use the following commands to build and run your Docker container:
Plain Text
docker-compose build
docker-compose up
Now, you should be able to access the Airflow webserver at localhost:8080. You can trigger your DAG and monitor the execution from the webserver.

Local Development Steps

  1. Ensure Docker and Docker Compose are installed on your local machine.
  1. Save the above Dockerfile in your project directory.
  1. Save your Airflow DAG and Python functions in a dags directory in your project directory.
  1. Save the docker-compose.yml file in your project directory.
  1. Build and run your Docker container using the docker-compose build and docker-compose up commands.

Using Luigi

Luigi is a Python module developed by Spotify that helps to build complex pipelines of batch jobs. Here’s how to implement an ETL job with Luigi that extracts data from a MySQL database, implements Change Data Capture (CDC), and loads it into an Amazon Redshift data warehouse.

Prerequisites

  • Python 3 installed on your local machine
  • MySQL and AWS Redshift instances up and running
  • mysql-connector-pythonpsycopg2 and luigi Python libraries installed
  • An orders table in your MySQL database with create_date and update_date columns
  • last_update.txt file to store the timestamp of the last update
Firstly, let’s create a config.py file for our database credentials:
Plain Text
MYSQL_CREDENTIALS = {
    'user': 'mysql_user',
    'password': 'mysql_password',
    'host': 'mysql_host',
    'database': 'mysql_database'
}

REDSHIFT_CREDENTIALS = {
    'user': 'redshift_user',
    'password': 'redshift_password',
    'host': 'redshift_host',
    'port': 'redshift_port',
    'database': 'redshift_database'
}
Now, let’s create our Luigi tasks in a file named etl_tasks.py:
Python
import luigi
import mysql.connector
import psycopg2
from datetime import datetime
from config import MYSQL_CREDENTIALS, REDSHIFT_CREDENTIALS

class ExtractFromMySQL(luigi.Task):
    def output(self):
        return luigi.LocalTarget('data.txt')

    def run(self):
        connection = mysql.connector.connect(**MYSQL_CREDENTIALS)
        cursor = connection.cursor()

        with open('last_update.txt', 'r') as f:
            last_update = f.read().strip()

        query = f"""SELECT * FROM orders
                    WHERE update_date > '{last_update}'"""
        cursor.execute(query)

        records = cursor.fetchall()
        with self.output().open('w') as f:
            for record in records:
                f.write(str(record) + '\n')

        with open('last_update.txt', 'w') as f:
            f.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

        cursor.close()
        connection.close()

class LoadToRedshift(luigi.Task):
    def requires(self):
        return ExtractFromMySQL()

    def run(self):
        connection = psycopg2.connect(**REDSHIFT_CREDENTIALS)
        cursor = connection.cursor()

        with self.input().open('r') as f:
            for line in f:
                record = eval(line)
                insert_query = "INSERT INTO orders VALUES " + str(record)
                cursor.execute(insert_query)

        connection.commit()
        cursor.close()
        connection.close()

if __name__ == '__main__':
    luigi.run()
In this script, we define two Luigi tasks: ExtractFromMySQL and LoadToRedshift. The ExtractFromMySQL task extracts new records from the MySQL database and writes them to a local file. The LoadToRedshift task reads the records from the local file and loads them to Redshift. The LoadToRedshift task depends on the ExtractFromMySQL task, so Luigi will automatically run ExtractFromMySQL first.
To run this ETL job, execute the following command in your terminal:
Plain Text
python etl_tasks.py LoadToRedshift
This will start the ETL job, and Luigi will automatically handle the dependencies between tasks. If you want to schedule this job to run regularly, you can use a job scheduler like cron on Unix-based systems or Task Scheduler on Windows.
The ETL job as described is designed to overwrite changes in the Redshift orders table with the latest data from the MySQL orders table. If a record in the MySQL orders table is updated and then extracted by the job, it will be inserted into the Redshift orders table, potentially creating duplicate entries.
If you want to keep historical data, you would need to modify the ETL job to take this into account. One common approach is to create a slowly changing dimension (SCD) in your Redshift data warehouse. There are several types of SCD, but the two most common are:
  • Type 1: Overwrite old data with new changes, losing the history of changes.
  • Type 2: Keep a full history of data changes in the dimension.
Here’s how you might modify the LoadToRedshift task for a Type 2 SCD:
Python
class LoadToRedshift(luigi.Task):
    def requires(self):
        return ExtractFromMySQL()

    def run(self):
        connection = psycopg2.connect(**REDSHIFT_CREDENTIALS)
        cursor = connection.cursor()

        with self.input().open('r') as f:
            for line in f:
                record = eval(line)
                # Check if the record already exists
                cursor.execute("SELECT * FROM orders WHERE id = %s", (record[0],))
                if cursor.fetchone() is not None:
                    # If the record exists, end the current record
                    cursor.execute("UPDATE orders SET end_date = %s WHERE id = %s AND end_date IS NULL", (datetime.now(), record[0]))
                # Insert the new record
                insert_query = "INSERT INTO orders VALUES (%s, %s, %s, NULL)" + str(record)
                cursor.execute(insert_query)

        connection.commit()
        cursor.close()
        connection.close()
In this example, the orders table in Redshift has an extra end_date column. When a new record is loaded, the task checks if the record already exists in the table. If it does, the task sets the end_date of the current record to the current date, marking the end of the record's validity. Then, the task inserts the new record, with the end_date set to NULL to indicate that this record is the current valid record.

Using Pyspark

PySpark is an interface for Apache Spark in Python. It enables Python programming for Apache Spark, allowing you to write Spark applications using Python APIs and enables data scientists to interface with RDDs in Python.
Below is an example of how you can use PySpark to perform the ETL operation.
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

# Create a SparkSession
spark = SparkSession.builder \
    .appName("MySQL to Redshift ETL") \
    .getOrCreate()

# Extract
mysql_df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://mysql_host:3306/mysql_database") \
    .option("driver", "com.mysql.jdbc.Driver") \
    .option("dbtable", "orders") \
    .option("user", "mysql_user") \
    .option("password", "mysql_password") \
    .load()

# Transform
# Assuming we have a 'last_update' column to keep track of changes
# 'update_date' is the column we will use for CDC
mysql_df = mysql_df.filter(col("update_date") > last_update)

# Load
mysql_df.write.format("jdbc") \
  .option("url", "jdbc:redshift://redshift_host:5439/redshift_database") \
  .option("driver", "com.amazon.redshift.jdbc.Driver") \
  .option("dbtable", "orders") \
  .option("user", "redshift_user") \
  .option("password", "redshift_password") \
  .mode("append") \
  .save()

# Update the 'last_update' after the load is finished
last_update = mysql_df.select(max("update_date")).collect()[0][0]

# Stop the SparkSession
spark.stop()
This script starts by creating a SparkSession, and then it reads the MySQL data into a DataFrame. It applies a filter on the ‘update_date’ column for CDC, and then writes the DataFrame to the Redshift database in append mode.
Please note, to make this work, you will need the JDBC drivers for MySQL and Redshift, and Spark should be able to access these drivers.
Also, the ‘last_update’ variable should be stored somewhere persistent and should be read before the ‘Transform’ step and updated after the ‘Load’ step. How you store this variable will depend on your specific application. Some options could be a database, a file, or an environment variable.
To schedule this PySpark script to run on an hourly basis, you could use cron on Unix-based systems or Task Scheduler on Windows. Just point them to wherever your PySpark script is saved.
Finally, please replace the placeholder values in the above script (like “mysql_host”, “mysql_user”, “mysql_database”, etc.) with your actual MySQL and Redshift credentials.

Using Petl

Petl is a general-purpose Python package for extracting, transforming, and loading tables of data. It’s simple and intuitive to use for basic ETL tasks.
Here’s how you can use Petl to implement the ETL job:

Prerequisites

  • Python 3 installed on your local machine
  • MySQL and AWS Redshift instances up and running
  • mysql-connector-pythonpsycopg2 and petl Python libraries installed
  • An orders table in your MySQL database with create_date and update_date columns
Firstly, let’s create a config.py file for our database credentials:
Python
MYSQL_CREDENTIALS = {
    'user': 'mysql_user',
    'password': 'mysql_password',
    'host': 'mysql_host',
    'database': 'mysql_database'
}

REDSHIFT_CREDENTIALS = {
    'user': 'redshift_user',
    'password': 'redshift_password',
    'host': 'redshift_host',
    'port': 'redshift_port',
    'database': 'redshift_database'
}
Now, create an ETL job with Petl:
Python
import petl as etl
import mysql.connector
import psycopg2
from datetime import datetime
from config import MYSQL_CREDENTIALS, REDSHIFT_CREDENTIALS

def etl_job():
    # Create a MySQL connection
    mysql_conn = mysql.connector.connect(**MYSQL_CREDENTIALS)
    # Create a Redshift connection
    redshift_conn = psycopg2.connect(**REDSHIFT_CREDENTIALS)

    # Extract
    with open('last_update.txt', 'r') as f:
        last_update = f.read().strip()
    query = f"""SELECT * FROM orders
                WHERE update_date > '{last_update}'"""
    source_table = etl.fromdb(mysql_conn, query)

    # Transform (if needed)
    # ...

    # Load
    etl.todb(source_table, redshift_conn, 'orders', create=False, commit=True, truncate=True)

    # Update last_update
    with open('last_update.txt', 'w') as f:
        f.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
In this script, the etl_job function first connects to the MySQL and Redshift databases. Then, it reads the MySQL data into a Petl table. It writes the Petl table to the Redshift database using the todb function, which truncates the Redshift table and inserts the new data.
Finally, the script updates the last_update file with the current timestamp. You can schedule this script to run on an hourly basis using cron or a similar scheduling tool.
Please note that this script assumes that the structure of your ‘orders’ table in MySQL and Redshift is the same. If this isn’t the case, you may need to transform the data to match the destination schema.

Using Pandas

Pandas is a Python library providing powerful data structures and data analysis tools. Here’s how to implement ETL with Pandas:

Prerequisites

  • Python 3 installed on your local machine
  • MySQL and AWS Redshift instances up and running
  • mysql-connector-pythonpsycopg2pandas and sqlalchemy Python libraries installed
  • An orders table in your MySQL database with create_date and update_date columns
First, let’s create a config.py file for our database credentials:
Plain Text
MYSQL_CREDENTIALS = {
    'user': 'mysql_user',
    'password': 'mysql_password',
    'host': 'mysql_host',
    'database': 'mysql_database'
}

REDSHIFT_CREDENTIALS = {
    'user': 'redshift_user',
    'password': 'redshift_password',
    'host': 'redshift_host',
    'port': 'redshift_port',
    'database': 'redshift_database'
}
Now, create an ETL job with Pandas:
Python
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime
from config import MYSQL_CREDENTIALS, REDSHIFT_CREDENTIALS

def etl_job():
    # Create a connection string for MySQL and Redshift
    mysql_conn_str = f"mysql+mysqlconnector://{MYSQL_CREDENTIALS['user']}:{MYSQL_CREDENTIALS['password']}@{MYSQL_CREDENTIALS['host']}/{MYSQL_CREDENTIALS['database']}"
    redshift_conn_str = f"postgresql+psycopg2://{REDSHIFT_CREDENTIALS['user']}:{REDSHIFT_CREDENTIALS['password']}@{REDSHIFT_CREDENTIALS['host']}:{REDSHIFT_CREDENTIALS['port']}/{REDSHIFT_CREDENTIALS['database']}"

    # Create a MySQL connection
    mysql_engine = create_engine(mysql_conn_str)

    # Extract
    with open('last_update.txt', 'r') as f:
        last_update = f.read().strip()
    query = f"""SELECT * FROM orders
                WHERE update_date > '{last_update}'"""
    source_df = pd.read_sql_query(query, mysql_engine)

    # Transform (if needed)
    # ...

    # Create a Redshift connection
    redshift_engine = create_engine(redshift_conn_str)

    # Load
    source_df.to_sql('orders', redshift_engine, if_exists='append', index=False)

    # Update last_update
    with open('last_update.txt', 'w') as f:
        f.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
In this script, the etl_job function first connects to the MySQL and Redshift databases. Then, it reads the MySQL data into a Pandas DataFrame. It writes the DataFrame to the Redshift database using the to_sql function, which appends the new data to the existing data.
Finally, the script updates the last_update file with the current timestamp. You can schedule this script to run on an hourly basis using cron or a similar scheduling tool.
Please note that this script assumes that the structure of your ‘orders’ table in MySQL and Redshift is the same. If this isn’t the case, you may need to transform the data to match the destination schema.
There are many ways to accomplish ETL tasks and which one to use depends on the specific requirements of your project. Here are a few more Python-based tools and libraries that are commonly used for ETL tasks:
  1. Apache Beam: Apache Beam is a unified model for defining both batch and streaming data-parallel processing pipelines. It provides a portable API layer for building sophisticated data-parallel processing pipelines that may be executed across a diversity of execution engines, or runners.
  1. Bonobo: Bonobo is a lightweight and straightforward ETL framework based on Python 3.5’s asyncio. It’s perfect for building out simple and maintainable ETL pipelines.
  1. Mara: Mara is a Python ETL tool that is lightweight but still offers the standard features for creating an ETL pipeline including parallel execution, error handling, and logging.
  1. Dask: Dask is a flexible library for parallel computing in Python. It’s particularly suited to working with large datasets and includes built-in functionality for performing distributed computations.
Each of these libraries has its own strengths and weaknesses, so the right choice depends on the specific requirements of your project. For example, if you’re working with very large datasets and need distributed computation, PySpark or Dask might be a good choice.
We shall review each tool via a full project implementation in distinct articles.