Table of Contents
Extract, Transform, Load (ETL) is a data pipeline process that involves extracting data from a source system, transforming it in some way, and then loading it into a target system. In this article, we’ll demonstrate how to build an ETL job that extracts data from a MySQL database and loads it into a Redshift data warehouse. We’ll also implement the Change Data Capture (CDC) concept to capture delta changes and trigger this ETL job every hour.
Using Normal Python Script
Prerequisites
- Python 3 installed on your local machine
- MySQL and AWS Redshift instances up and running
mysql-connector-python
andpsycopg2
Python libraries installed
- An
orders
table in your MySQL database withcreate_date
andupdate_date
columns
Step 1: Extract Data from MySQL
First, we will extract the data from the MySQL database using the
mysql-connector-python
library. Here's a simple Python function that connects to a MySQL database and fetches new records from the orders
table:Python
import mysql.connector
from datetime import datetime
# set last_update to the start of epoch
last_update = datetime(1970, 1, 1)
def extract_new_records():
global last_update
connection = mysql.connector.connect(user='mysql_user', password='mysql_password',
host='mysql_host', database='mysql_database')
cursor = connection.cursor()
query = f"""SELECT * FROM orders
WHERE update_date > '{last_update.strftime("%Y-%m-%d %H:%M:%S")}'"""
cursor.execute(query)
records = cursor.fetchall()
last_update = datetime.now()
cursor.close()
connection.close()
return records
Step 2: Load Data to Redshift
The next step is to load the extracted data into Redshift. We’ll use the
psycopg2
library to insert the records into the Redshift table:Python
import psycopg2
def load_to_redshift(records):
connection = psycopg2.connect(user='redshift_user', password='redshift_password',
host='redshift_host', port='redshift_port', database='redshift_database')
cursor = connection.cursor()
for record in records:
insert_query = "INSERT INTO orders VALUES " + str(record)
cursor.execute(insert_query)
connection.commit()
cursor.close()
connection.close()
Step 3: Schedule the ETL Job
Now that we have our extract and load functions, we need to schedule our ETL job to run every hour. We can use the
schedule
library for this:Python
import schedule
import time
def etl_job():
records = extract_new_records()
load_to_redshift(records)
# Schedule the job every hour
schedule.every(1).hours.do(etl_job)
# Keep the script running
while True:
schedule.run_pending()
time.sleep(1)
Local Development Steps
- Install required libraries: Run
pip install mysql-connector-python psycopg2-binary schedule
in your terminal.
- Save the above code in a Python file, e.g.,
etl_job.py
.
- Update the MySQL and Redshift connection details in the
extract_new_records
andload_to_redshift
functions.
- Run the script: Run
python etl_job.py
in your terminal to start the ETL job.
Using Airflow
Apache Airflow is a powerful platform used to programmatically author, schedule, and monitor workflows.
Here’s how you might set up the ETL job using Airflow.
Prerequisites
- Apache Airflow is installed on your local machine.
- MySQL and AWS Redshift instances are up and running.
mysql-connector-python
andpsycopg2
Python libraries installed.
- An
orders
table in your MySQL database withcreate_date
andupdate_date
columns.
Step 1: Define the Extraction & Loading Function
First, we will define the extraction and loading functions in Python.
Python
import mysql.connector
import psycopg2
from datetime import datetime
from airflow.models import Variable
# Fetch the last_update from Airflow Variable, if not exists, set to the start of epoch
last_update = Variable.get("last_update", default_var=datetime(1970, 1, 1))
def extract_new_records():
global last_update
connection = mysql.connector.connect(user='mysql_user', password='mysql_password',
host='mysql_host', database='mysql_database')
cursor = connection.cursor()
query = f"""SELECT * FROM orders
WHERE update_date > '{last_update.strftime("%Y-%m-%d %H:%M:%S")}'"""
cursor.execute(query)
records = cursor.fetchall()
last_update = datetime.now()
# Store the last_update back to Airflow Variable
Variable.set("last_update", last_update)
cursor.close()
connection.close()
return records
def load_to_redshift(records):
connection = psycopg2.connect(user='redshift_user', password='redshift_password',
host='redshift_host', port='redshift_port', database='redshift_database')
cursor = connection.cursor()
for record in records:
insert_query = "INSERT INTO orders VALUES " + str(record)
cursor.execute(insert_query)
connection.commit()
cursor.close()
connection.close()
Step 2: Define Airflow DAG and Tasks
Create a new DAG in your Airflow DAG directory (typically
$AIRFLOW_HOME/dags
). This example assumes that file is named mysql_to_redshift.py
.Python
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from etl_functions import extract_new_records, load_to_redshift # assuming the above functions are in etl_functions.py file
# Define the DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'mysql_to_redshift_cdc',
default_args=default_args,
description='A simple ETL job',
schedule_interval=timedelta(hours=1),
)
# Define the tasks
extract_task = PythonOperator(
task_id='extract_from_mysql',
python_callable=extract_new_records,
dag=dag,
)
load_task = PythonOperator(
task_id='load_to_redshift',
python_callable=load_to_redshift,
dag=dag,
)
# Set the task dependencies
extract_task >> load_task
Note: In Apache Airflow, you can pass data between tasks using XComs (cross-communication), which is a mechanism that lets tasks exchange messages or small amounts of data. The data is usually stored in the Airflow metadata database and can be fetched by dependent tasks.
Here’s a modified version of your code that utilizes XComs:
Python
# Step 1: Extract Data from MySQL and load to Redshift
# ... (unchanged import statements)
def extract_new_records(**kwargs):
global last_update
# ... (same as before)
# Store the last_update back to Airflow Variable
Variable.set("last_update", last_update)
# Push records to XCom
kwargs['ti'].xcom_push(key='extracted_records', value=records)
def load_to_redshift(**kwargs):
# Fetch records from XCom
ti = kwargs['ti']
records = ti.xcom_pull(task_ids='extract_from_mysql', key='extracted_records')
# ... (same as before)
# Step 2: Define Airflow DAG and Tasks
# ... (unchanged import statements)
# Define the DAG
# ... (same as before)
# Define the tasks
extract_task = PythonOperator(
task_id='extract_from_mysql',
python_callable=extract_new_records,
provide_context=True, # Important: this makes the Airflow context available in your function
dag=dag,
)
load_task = PythonOperator(
task_id='load_to_redshift',
python_callable=load_to_redshift,
provide_context=True, # Important: this makes the Airflow context available in your function
dag=dag,
)
# Set the task dependencies
extract_task >> load_task
In this updated code:
extract_new_records()
now pushes therecords
to XCom after fetching them.
load_to_redshift()
pulls therecords
from XCom before inserting them into the Redshift database.
- Both tasks have
provide_context=True
set, so you can usekwargs['ti']
to access the task instance and, consequently, use XComs.
Local Development Steps
- Install required libraries: Run
pip install apache-airflow mysql-connector-python psycopg2-binary
in your terminal.
- Save the above Python functions in a file, e.g.,
etl_functions.py
.
- Update the MySQL and Redshift connection details in the
extract_new_records
andload_to_redshift
functions.
- Save the DAG file in your Airflow DAGs directory.
- Start the Airflow webserver and scheduler: Run
airflow webserver
andairflow scheduler
in your terminal.
- Open Airflow UI in your web browser (usually
localhost:8080
), you should see your DAGmysql_to_redshift_cdc
. Trigger the DAG and monitor the execution.
below are the steps to set up a Docker environment for running the Airflow ETL job locally.
Step 1: Create Dockerfile
First, let’s create a Dockerfile to set up the Python environment and install the necessary dependencies.
Python
# Use an official Python runtime as a parent image
FROM python:3.8-slim-buster
# Set the working directory
WORKDIR /usr/src/app
# Install Airflow and other dependencies
RUN pip install --no-cache-dir apache-airflow mysql-connector-python psycopg2-binary
# Copy the current directory contents into the container at /usr/src/app
COPY . .
# Expose default airflow port
EXPOSE 8080
# Start the airflow webserver and scheduler in the background
CMD airflow db init && \
airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email [email protected] && \
airflow webserver -D && \
airflow scheduler -D && \
/bin/bash
Step 2: Create docker-compose.yml file
Next, we’ll use Docker Compose to spin up our Airflow service. Docker Compose allows us to define and run multi-container Docker applications. Create a
docker-compose.yml
file in the same directory as your Dockerfile:Plain Text
version: '3'
services:
airflow:
build: .
volumes:
- ./dags:/usr/src/app/dags
ports:
- "8080:8080"
This Compose file defines an Airflow service built from the current directory’s Dockerfile, maps the local
dags
directory to the dags
directory in the container, and forwards the container's port 8080 to port 8080 on the host machine.Step 3: Build and Run the Docker Container
Finally, use the following commands to build and run your Docker container:
Plain Text
docker-compose build
docker-compose up
Now, you should be able to access the Airflow webserver at
localhost:8080
. You can trigger your DAG and monitor the execution from the webserver.Local Development Steps
- Ensure Docker and Docker Compose are installed on your local machine.
- Save the above Dockerfile in your project directory.
- Save your Airflow DAG and Python functions in a
dags
directory in your project directory.
- Save the docker-compose.yml file in your project directory.
- Build and run your Docker container using the
docker-compose build
anddocker-compose up
commands.
Using Luigi
Luigi is a Python module developed by Spotify that helps to build complex pipelines of batch jobs. Here’s how to implement an ETL job with Luigi that extracts data from a MySQL database, implements Change Data Capture (CDC), and loads it into an Amazon Redshift data warehouse.
Prerequisites
- Python 3 installed on your local machine
- MySQL and AWS Redshift instances up and running
mysql-connector-python
,psycopg2
andluigi
Python libraries installed
- An
orders
table in your MySQL database withcreate_date
andupdate_date
columns
- A
last_update.txt
file to store the timestamp of the last update
Firstly, let’s create a
config.py
file for our database credentials:Plain Text
MYSQL_CREDENTIALS = {
'user': 'mysql_user',
'password': 'mysql_password',
'host': 'mysql_host',
'database': 'mysql_database'
}
REDSHIFT_CREDENTIALS = {
'user': 'redshift_user',
'password': 'redshift_password',
'host': 'redshift_host',
'port': 'redshift_port',
'database': 'redshift_database'
}
Now, let’s create our Luigi tasks in a file named
etl_tasks.py
:Python
import luigi
import mysql.connector
import psycopg2
from datetime import datetime
from config import MYSQL_CREDENTIALS, REDSHIFT_CREDENTIALS
class ExtractFromMySQL(luigi.Task):
def output(self):
return luigi.LocalTarget('data.txt')
def run(self):
connection = mysql.connector.connect(**MYSQL_CREDENTIALS)
cursor = connection.cursor()
with open('last_update.txt', 'r') as f:
last_update = f.read().strip()
query = f"""SELECT * FROM orders
WHERE update_date > '{last_update}'"""
cursor.execute(query)
records = cursor.fetchall()
with self.output().open('w') as f:
for record in records:
f.write(str(record) + '\n')
with open('last_update.txt', 'w') as f:
f.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
cursor.close()
connection.close()
class LoadToRedshift(luigi.Task):
def requires(self):
return ExtractFromMySQL()
def run(self):
connection = psycopg2.connect(**REDSHIFT_CREDENTIALS)
cursor = connection.cursor()
with self.input().open('r') as f:
for line in f:
record = eval(line)
insert_query = "INSERT INTO orders VALUES " + str(record)
cursor.execute(insert_query)
connection.commit()
cursor.close()
connection.close()
if __name__ == '__main__':
luigi.run()
In this script, we define two Luigi tasks:
ExtractFromMySQL
and LoadToRedshift
. The ExtractFromMySQL
task extracts new records from the MySQL database and writes them to a local file. The LoadToRedshift
task reads the records from the local file and loads them to Redshift. The LoadToRedshift
task depends on the ExtractFromMySQL
task, so Luigi will automatically run ExtractFromMySQL
first.To run this ETL job, execute the following command in your terminal:
Plain Text
python etl_tasks.py LoadToRedshift
This will start the ETL job, and Luigi will automatically handle the dependencies between tasks. If you want to schedule this job to run regularly, you can use a job scheduler like cron on Unix-based systems or Task Scheduler on Windows.
The ETL job as described is designed to overwrite changes in the Redshift
orders
table with the latest data from the MySQL orders
table. If a record in the MySQL orders
table is updated and then extracted by the job, it will be inserted into the Redshift orders
table, potentially creating duplicate entries.If you want to keep historical data, you would need to modify the ETL job to take this into account. One common approach is to create a slowly changing dimension (SCD) in your Redshift data warehouse. There are several types of SCD, but the two most common are:
- Type 1: Overwrite old data with new changes, losing the history of changes.
- Type 2: Keep a full history of data changes in the dimension.
Here’s how you might modify the
LoadToRedshift
task for a Type 2 SCD:Python
class LoadToRedshift(luigi.Task):
def requires(self):
return ExtractFromMySQL()
def run(self):
connection = psycopg2.connect(**REDSHIFT_CREDENTIALS)
cursor = connection.cursor()
with self.input().open('r') as f:
for line in f:
record = eval(line)
# Check if the record already exists
cursor.execute("SELECT * FROM orders WHERE id = %s", (record[0],))
if cursor.fetchone() is not None:
# If the record exists, end the current record
cursor.execute("UPDATE orders SET end_date = %s WHERE id = %s AND end_date IS NULL", (datetime.now(), record[0]))
# Insert the new record
insert_query = "INSERT INTO orders VALUES (%s, %s, %s, NULL)" + str(record)
cursor.execute(insert_query)
connection.commit()
cursor.close()
connection.close()
In this example, the
orders
table in Redshift has an extra end_date
column. When a new record is loaded, the task checks if the record already exists in the table. If it does, the task sets the end_date
of the current record to the current date, marking the end of the record's validity. Then, the task inserts the new record, with the end_date
set to NULL
to indicate that this record is the current valid record.Using Pyspark
PySpark is an interface for Apache Spark in Python. It enables Python programming for Apache Spark, allowing you to write Spark applications using Python APIs and enables data scientists to interface with RDDs in Python.
Below is an example of how you can use PySpark to perform the ETL operation.
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
# Create a SparkSession
spark = SparkSession.builder \
.appName("MySQL to Redshift ETL") \
.getOrCreate()
# Extract
mysql_df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://mysql_host:3306/mysql_database") \
.option("driver", "com.mysql.jdbc.Driver") \
.option("dbtable", "orders") \
.option("user", "mysql_user") \
.option("password", "mysql_password") \
.load()
# Transform
# Assuming we have a 'last_update' column to keep track of changes
# 'update_date' is the column we will use for CDC
mysql_df = mysql_df.filter(col("update_date") > last_update)
# Load
mysql_df.write.format("jdbc") \
.option("url", "jdbc:redshift://redshift_host:5439/redshift_database") \
.option("driver", "com.amazon.redshift.jdbc.Driver") \
.option("dbtable", "orders") \
.option("user", "redshift_user") \
.option("password", "redshift_password") \
.mode("append") \
.save()
# Update the 'last_update' after the load is finished
last_update = mysql_df.select(max("update_date")).collect()[0][0]
# Stop the SparkSession
spark.stop()
This script starts by creating a SparkSession, and then it reads the MySQL data into a DataFrame. It applies a filter on the ‘update_date’ column for CDC, and then writes the DataFrame to the Redshift database in append mode.
Please note, to make this work, you will need the JDBC drivers for MySQL and Redshift, and Spark should be able to access these drivers.
Also, the ‘last_update’ variable should be stored somewhere persistent and should be read before the ‘Transform’ step and updated after the ‘Load’ step. How you store this variable will depend on your specific application. Some options could be a database, a file, or an environment variable.
To schedule this PySpark script to run on an hourly basis, you could use cron on Unix-based systems or Task Scheduler on Windows. Just point them to wherever your PySpark script is saved.
Finally, please replace the placeholder values in the above script (like “mysql_host”, “mysql_user”, “mysql_database”, etc.) with your actual MySQL and Redshift credentials.
Using Petl
Petl is a general-purpose Python package for extracting, transforming, and loading tables of data. It’s simple and intuitive to use for basic ETL tasks.
Here’s how you can use Petl to implement the ETL job:
Prerequisites
- Python 3 installed on your local machine
- MySQL and AWS Redshift instances up and running
mysql-connector-python
,psycopg2
andpetl
Python libraries installed
- An
orders
table in your MySQL database withcreate_date
andupdate_date
columns
Firstly, let’s create a
config.py
file for our database credentials:Python
MYSQL_CREDENTIALS = {
'user': 'mysql_user',
'password': 'mysql_password',
'host': 'mysql_host',
'database': 'mysql_database'
}
REDSHIFT_CREDENTIALS = {
'user': 'redshift_user',
'password': 'redshift_password',
'host': 'redshift_host',
'port': 'redshift_port',
'database': 'redshift_database'
}
Now, create an ETL job with Petl:
Python
import petl as etl
import mysql.connector
import psycopg2
from datetime import datetime
from config import MYSQL_CREDENTIALS, REDSHIFT_CREDENTIALS
def etl_job():
# Create a MySQL connection
mysql_conn = mysql.connector.connect(**MYSQL_CREDENTIALS)
# Create a Redshift connection
redshift_conn = psycopg2.connect(**REDSHIFT_CREDENTIALS)
# Extract
with open('last_update.txt', 'r') as f:
last_update = f.read().strip()
query = f"""SELECT * FROM orders
WHERE update_date > '{last_update}'"""
source_table = etl.fromdb(mysql_conn, query)
# Transform (if needed)
# ...
# Load
etl.todb(source_table, redshift_conn, 'orders', create=False, commit=True, truncate=True)
# Update last_update
with open('last_update.txt', 'w') as f:
f.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
In this script, the
etl_job
function first connects to the MySQL and Redshift databases. Then, it reads the MySQL data into a Petl table. It writes the Petl table to the Redshift database using the todb
function, which truncates the Redshift table and inserts the new data.Finally, the script updates the
last_update
file with the current timestamp. You can schedule this script to run on an hourly basis using cron or a similar scheduling tool.Please note that this script assumes that the structure of your ‘orders’ table in MySQL and Redshift is the same. If this isn’t the case, you may need to transform the data to match the destination schema.
Using Pandas
Pandas is a Python library providing powerful data structures and data analysis tools. Here’s how to implement ETL with Pandas:
Prerequisites
- Python 3 installed on your local machine
- MySQL and AWS Redshift instances up and running
mysql-connector-python
,psycopg2
,pandas
andsqlalchemy
Python libraries installed
- An
orders
table in your MySQL database withcreate_date
andupdate_date
columns
First, let’s create a
config.py
file for our database credentials:Plain Text
MYSQL_CREDENTIALS = {
'user': 'mysql_user',
'password': 'mysql_password',
'host': 'mysql_host',
'database': 'mysql_database'
}
REDSHIFT_CREDENTIALS = {
'user': 'redshift_user',
'password': 'redshift_password',
'host': 'redshift_host',
'port': 'redshift_port',
'database': 'redshift_database'
}
Now, create an ETL job with Pandas:
Python
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime
from config import MYSQL_CREDENTIALS, REDSHIFT_CREDENTIALS
def etl_job():
# Create a connection string for MySQL and Redshift
mysql_conn_str = f"mysql+mysqlconnector://{MYSQL_CREDENTIALS['user']}:{MYSQL_CREDENTIALS['password']}@{MYSQL_CREDENTIALS['host']}/{MYSQL_CREDENTIALS['database']}"
redshift_conn_str = f"postgresql+psycopg2://{REDSHIFT_CREDENTIALS['user']}:{REDSHIFT_CREDENTIALS['password']}@{REDSHIFT_CREDENTIALS['host']}:{REDSHIFT_CREDENTIALS['port']}/{REDSHIFT_CREDENTIALS['database']}"
# Create a MySQL connection
mysql_engine = create_engine(mysql_conn_str)
# Extract
with open('last_update.txt', 'r') as f:
last_update = f.read().strip()
query = f"""SELECT * FROM orders
WHERE update_date > '{last_update}'"""
source_df = pd.read_sql_query(query, mysql_engine)
# Transform (if needed)
# ...
# Create a Redshift connection
redshift_engine = create_engine(redshift_conn_str)
# Load
source_df.to_sql('orders', redshift_engine, if_exists='append', index=False)
# Update last_update
with open('last_update.txt', 'w') as f:
f.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
In this script, the
etl_job
function first connects to the MySQL and Redshift databases. Then, it reads the MySQL data into a Pandas DataFrame. It writes the DataFrame to the Redshift database using the to_sql
function, which appends the new data to the existing data.Finally, the script updates the
last_update
file with the current timestamp. You can schedule this script to run on an hourly basis using cron or a similar scheduling tool.Please note that this script assumes that the structure of your ‘orders’ table in MySQL and Redshift is the same. If this isn’t the case, you may need to transform the data to match the destination schema.
There are many ways to accomplish ETL tasks and which one to use depends on the specific requirements of your project. Here are a few more Python-based tools and libraries that are commonly used for ETL tasks:
- Apache Beam: Apache Beam is a unified model for defining both batch and streaming data-parallel processing pipelines. It provides a portable API layer for building sophisticated data-parallel processing pipelines that may be executed across a diversity of execution engines, or runners.
- Bonobo: Bonobo is a lightweight and straightforward ETL framework based on Python 3.5’s asyncio. It’s perfect for building out simple and maintainable ETL pipelines.
- Mara: Mara is a Python ETL tool that is lightweight but still offers the standard features for creating an ETL pipeline including parallel execution, error handling, and logging.
- Dask: Dask is a flexible library for parallel computing in Python. It’s particularly suited to working with large datasets and includes built-in functionality for performing distributed computations.
Each of these libraries has its own strengths and weaknesses, so the right choice depends on the specific requirements of your project. For example, if you’re working with very large datasets and need distributed computation, PySpark or Dask might be a good choice.
We shall review each tool via a full project implementation in distinct articles.