Why Use Airflow for Schema Migrations?

Before I get into the tutorial, there are a few tools out there to help you manage your schema migrations eg. Flyway, Liquibase and Alembic. This article is to outline the possibilities of schema migrations using Airflow DAGs.

Investigation of Other Solutions

Flyway → Very development heavy solution for small projects, especially for any non-JVM project (pyway for Python exists however it doesn’t offer the range of support for DB engines). ❌

Liquibase → Although we can try this for a limited time for free, this is a paid service. ❌

Alembic → Complex framework, autogeneration only works some times because of Redshift specific behaviours (eg. Altering a column in Redshift drops the column and re-adds it). It utilises SQL Alchemy toolkit, so there is also overhead trying to get a Redshift dialect working with SQL Alchemy. ❌

Airflow → Open source and offers a free version. Integrates easily with Amazon Redshift. Written in pure Python and suitable for quickly creating POCs. ✔️

Prerequisites

  1. Proficiency writing Airflow DAGs and programming in Python.
  2. Proficiency with SQL.
  3. Redshift library (boto3) already implemented within Airflow .

Redshift Limitations

One issue I recently came across was when a table already populated in Redshift needed to have schema change, specifically it needed addition columns included.

Redshift offers the ability to CREATE/ALTER/DELETE IF NOT EXISTS on a table, however such functionality isn’t available at the column level. So simply writing an SQL script to be run as an Airflow Bash command wasn’t an option.

eg.

ALTER TABLE example_table    
ADD COLUMN IF NOT EXISTS start_time timestamp;
This would’ve been far too simple!

Unfortunately, there is no Redshift equivalent to ADD COLUMN IF NOT EXISTS. So we need to think outside the box if we want to be able to safely perform migrations on production tables in particular!

That’s when Redshift’s metadata tables come in handy! Redshift uses pg_table_def to store metadata about all public/private schemas visible to the user. Using this we can query this table, and perform a check to see if the column being adding to our table, already exists.

SELECT EXISTS (SELECT count(*) as column_count 
FROM pg_table_def 
WHERE schema='public' AND tablename='example_table' 
AND "column" IN ('start_time')
GROUP BY tablename
HAVING column_count = 1) AS pg_table_def_check;

This above SQL query should return a True or False depending on whether the column exists in the table.

The subquery includes an IN clause allowing for additional columns to be included, for example if we want to add 2 more columns all that needs to be done is to add them to the IN (...) list and update the HAVING clause to equal 3 instead.

The outer query wraps this subquery into a boolean value.

With this, if one of these columns does exist, then our response will be False. We can only receive a True if all of our columns are not present.

Airflow DAG

Now let's design the Airflow DAG. We want this to be a very simple DAG, nothing overly complex for this operation. Using the template provided by Airflow will suffice.

default_args = {    
	'owner': 'airflow',    
    'depends_on_past': False,    
    'start_date': days_ago(2),    
    'email': ['airflow@example.com'],    
    'email_on_failure': False,    
    'email_on_retry': False,    
    'retries': 0
}

dag = DAG('schema_migration',    
		  default_args=default_args,    
          description='A Redshift Schema Migration DAG',
          schedule_interval="@once")

schema_migrator = BashOperator(    
	task_id='schema_migrator',    
    bash_command='cd /path/to/local/airflow; python3 -m path.to.airflow.task.schema_migrator',    
    dag=dag)

The majority of the template will remain the same, the main area we’ll want to update is our schedule_interval to only schedule this migration once. Giving us the ability to manually trigger the migration when it's safe to do so (avoiding situations where another DAG's task could be running a DDL/DML command on the table while our task is trying to alter it!).

Airflow Migrate Schemas Task

Now for the main event, our migrate_schemas task! The layout of this task is very straight forward. We simply want to:

  1. Use the pg_table_def_check script to check if a column(s) exists on the table of interest.
  2. If they don’t exist, run the ALTER TABLE script to add the new columns.
  3. If they do, then skip to the next migration script.

Here’s an example of the migration script directory:

↳migrate_schemas	
	↳ sql_scripts	    
    		↳ pg_table_def_checks                
        		↳ 001_example_table_pg_table_def_check.sql                
        	↳ alter_table_script	        	
        		↳ 001_example_table_alter_add_columns.sql

Using an index prefix for our scripts allows us to ensure that for each migration we want to add, our pg_table_def_check and the alter scripts will always correspond to the correct migration when being read and run.

All that’s left is to iterate over both script directories, check if the response from our pg_table_def_check is False, and execute the alter column scripts.

Using this approach we can safely trigger an Airflow task to iterate over migration scripts, check if the conditions are correct to alter a table and if so run our migration manually.

Redshift Transactional Blocks

Transactions are imported to use in any migration as they ensure that if a single statement fails, the entire migration can be rolled back.

When there is more than one alter script in the directory it’s possible to run into issues when attempting to run concurrent DDL/DML commands in Redshift.

eg. 001_alter_script.sql is currently changing a column type in a table and also performing a VACUUM FULL to re-sort the table based on the new type.

In order to avoid any conflict, we need to tell Redshift to finish a transactional block before we start another.

This can be achieved by adding END; or COMMIT; to the end of any transactional block.

Conclusion

Using this approach we can save a lot of development time and effort using Airflow for a migration POC instead of fiddling with 3rd party options.

Airflow allows us to safely trigger a task to iterate over migration scripts, check if the conditions are correct for a migration and if so run our migration manually.

This is a good option to demonstrate the possibilities of Airflow DAGs/Tasks, especially when schema migrations aren’t occurring frequently enough to validate the need for a larger scale project.

However, it should be noted that this POC shouldn’t be used for databases which require a more mature solution for version management, schema migrations and for ensuring the consistency of the DB’s state in a production environment.

Thank you for reading!