Airflow#

Airflow is a tool that allows to schedule a set of processes commonly used in ETL pipelines and sometimes in ML automation. This section considers typical ways to use Airflow.

We typically are typically run airflow in the Docker container to make sure that we are working in a clean environment. Use following command to run the container.

Build the docker image described in the airflow_files/dockerfile and run it with the standalone command.

docker build -f packages/airflow_files/dockerfile .
docker run -d --rm --name airflow -p 8080:8080 -v ./:/knowledge airflow standalone

Image that is used as an example configured in such way to create default user with login user and password user that will be used as credentials for the airflow server.

Configuration file#

The global configuration of the airflow is stored in the special file: airflow.cfg. Different installations put this file in different places (as usual). Typical locations are /opt/airflow/airflow.cfg and ~/airflow/airflow.cfg.


Any way use following command to find the location of the airflow.cfg on your disk.

!find / -name airflow.cfg
/opt/airflow/airflow.cfg

DAG#

DAG stands for Direct Acyclic Graph. And it’s a collection of tasks and determines relationships between them.


As a basic example, consider a procedure that adds dag to Airflow.

First you need to identify the folder containing dags. This folder is specified by the dogs_folder parameter of the configuration file.

!cat /opt/airflow/airflow.cfg | grep dags_folder
dags_folder = /opt/airflow/dags

DAG must be implemented by the special DAG file. This is the file that contains airflow.models.DAG object. It takes a lot of settings that determine it’s behavior but in general it needs only dag_id to be specified. The following cell defines a dag with id tutorial.

%%writefile /opt/airflow/dags/tutorial.py
from airflow.models.dag import DAG

with DAG("tutorial") as dag:
    pass
Writing /opt/airflow/dags/tutorial.py

The following command causes airflow to add DAG to its databases.

!airflow db migrate
DB: sqlite:////opt/airflow/airflow.db
Performing upgrade to the metadata database sqlite:////opt/airflow/airflow.db
[2025-03-22T14:14:21.698+0000] {migration.py:207} INFO - Context impl SQLiteImpl.
[2025-03-22T14:14:21.700+0000] {migration.py:210} INFO - Will assume non-transactional DDL.
[2025-03-22T14:14:21.704+0000] {migration.py:207} INFO - Context impl SQLiteImpl.
[2025-03-22T14:14:21.704+0000] {migration.py:210} INFO - Will assume non-transactional DDL.
[2025-03-22T14:14:21.705+0000] {db.py:1675} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
Database migrating done!

With airflow dags list you can show DAGs that are seen by the airflow.

!airflow dags list
dag_id   | fileloc                       | owners | is_paused
=========+===============================+========+==========
tutorial | /opt/airflow/dags/tutorial.py |        | True     
                                                             

As a result there is a dag we have added below.

Task flow#

There is an alternative way to define DAG - to use TaskFlow syntax. You need to cover the function that will arc as a DAG with ariflow.decorators.dag decorator and run result function.

Check Working with TaskFlow for more details.


The following cell defines the DAG in the Task Flow syntax.

%%writefile /opt/airflow/dags/tutorial.py
from airflow.decorators import dag

@dag("taskflow_example")
def tutorial():
    pass

tutorial()
Overwriting /opt/airflow/dags/tutorial.py

The following cell reloads the airflow database - as a result the dag_id corresponds to the parameter specified in the ariflow.decorators.dag.

%%bash
airflow db migrate &> /dev/null
airflow dags list
dag_id           | fileloc                       | owners | is_paused
=================+===============================+========+==========
taskflow_example | /opt/airflow/dags/tutorial.py |        | True     
                                                                     

Tasks#

Task is a basic execution unit in Airflow. Task can be created from:

  • Operator: preset template for creating a task.

  • Sensor: special kind of operator that tiggers on some event.

  • TaskFlow syntax: you just have to wrap a normal python function into airflow.decorators.task.

Here is the list of the common airflow operators:

Operator

Description

PythonOperator

Executes a Python function.

BashOperator

Runs a Bash command or script.

PostgresOperator

Executes a SQL query in a PostgreSQL database.

MySqlOperator

Executes a SQL query in a MySQL database.

SqliteOperator

Executes a SQL query in a SQLite database.

MsSqlOperator

Executes a SQL query in an MS SQL Server database.

SnowflakeOperator

Executes a query in Snowflake.

BigQueryOperator

Executes a SQL query in Google BigQuery.

HiveOperator

Executes a HiveQL query in Apache Hive.

S3FileTransformOperator

Transfers and transforms files in AWS S3.

EmailOperator

Sends an email.

HttpOperator

Sends an HTTP request.

DummyOperator (Deprecated: Use EmptyOperator)

A placeholder operator that does nothing.

EmptyOperator

A lightweight no-op operator, replacing DummyOperator.

BranchPythonOperator

Chooses the next task dynamically based on a Python function.

TriggerDagRunOperator

Triggers another DAG.

KubernetesPodOperator

Runs a task inside a Kubernetes pod.

Find out more:

  • In the official Tasks page.

  • In the specific page.


As example consider basic tools to handle the tasks in mlflow.

The following code defines DAG with bash task and loads it to the airflow.

%%writefile /opt/airflow/dags/tutorial.py
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

with DAG("tutorial") as dag:
    BashOperator(
        task_id="hello_world",
        bash_command="echo 'Hello world'"
    )
Overwriting /opt/airflow/dags/tutorial.py
!airflow db migrate &> /dev/null

List the tasks of the DAG with airflow tasks list <name of the dug>.

!airflow tasks list tutorial
hello_world

Run tasks for testing purposes using airflow tasks test <name of dag> <name of task>

!airflow tasks test tutorial hello_world
[2025-03-22T14:47:49.566+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags
[2025-03-22T14:47:49.698+0000] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: tutorial.hello_world __airflow_temporary_run_2025-03-22T14:47:49.584901+00:00__ [None]>
[2025-03-22T14:47:49.702+0000] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: tutorial.hello_world __airflow_temporary_run_2025-03-22T14:47:49.584901+00:00__ [None]>
[2025-03-22T14:47:49.702+0000] {taskinstance.py:2867} INFO - Starting attempt 0 of 1
[2025-03-22T14:47:49.703+0000] {taskinstance.py:2948} WARNING - cannot record queued_duration for task hello_world because previous state change time has not been saved
[2025-03-22T14:47:49.703+0000] {taskinstance.py:2890} INFO - Executing <Task(BashOperator): hello_world> on 2025-03-22 14:47:49.584873+00:00
[2025-03-22T14:47:50.022+0000] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='tutorial' AIRFLOW_CTX_TASK_ID='hello_world' AIRFLOW_CTX_EXECUTION_DATE='2025-03-22T14:47:49.584873+00:00' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2025-03-22T14:47:49.584901+00:00__'
[2025-03-22T14:47:50.025+0000] {taskinstance.py:732} INFO - ::endgroup::
[2025-03-22T14:47:50.040+0000] {subprocess.py:78} INFO - Tmp dir root location: /tmp
[2025-03-22T14:47:50.041+0000] {subprocess.py:88} INFO - Running command: ['/usr/bin/bash', '-c', "echo 'Hello world'"]
[2025-03-22T14:47:50.048+0000] {subprocess.py:99} INFO - Output:
[2025-03-22T14:47:50.049+0000] {subprocess.py:106} INFO - Hello world
[2025-03-22T14:47:50.050+0000] {subprocess.py:110} INFO - Command exited with return code 0
[2025-03-22T14:47:50.065+0000] {taskinstance.py:341} INFO - ::group::Post task execution logs
[2025-03-22T14:47:50.065+0000] {taskinstance.py:353} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=hello_world, run_id=__airflow_temporary_run_2025-03-22T14:47:49.584901+00:00__, execution_date=20250322T144749, start_date=, end_date=20250322T144750

The outputs contain a lot of logging information, but now only the line after “Output” is important. The next cell highlights it.

!airflow tasks test tutorial hello_world | grep "Output" -A 1
[2025-03-22T14:48:00.890+0000] {subprocess.py:99} INFO - Output:
[2025-03-22T14:48:00.891+0000] {subprocess.py:106} INFO - Hello world

Database#

Airflow keeps all its data in a special database. It can be useful to be able to access this data from the formal interface. Different databases can be used, but the default and simpliest option is a sqlite.

The following cell represents some of the popular tables that are available in the SQLite database.

Table Name

Description

dag

Stores metadata about DAGs.

dag_run

Stores metadata about DAG runs (instances of DAG executions).

task_instance

Stores metadata about task instances (individual task executions).

log

Stores logs for task instances.

job

Stores metadata about jobs, such as the scheduler and backfill jobs.

xcom

Stores XCom (cross-communication) data for tasks.

variable

Stores Airflow variables.

connection

Stores Airflow connections.

task_fail

Stores information about task failures.

sla_miss

Stores information about SLA misses.


The following cell shows how to access the task_isntance table from python script.

import sqlite3
import pandas as pd
con = sqlite3.connect("/opt/airflow/airflow.db")
pd.read_sql_query("SELECT * FROM task_instance LIMIT 5;", con)
task_id dag_id run_id map_index start_date end_date duration state try_number max_tries ... executor executor_config updated_at rendered_map_index external_executor_id trigger_id trigger_timeout next_method next_kwargs task_display_name
0 hello_world tutorial backfill__2015-06-01T00:00:00+00:00 -1 2025-03-22 20:18:53.268787 2025-03-22 20:18:53.409019 0.140232 success 1 0 ... None b'\x80\x05}\x94.' 2025-03-22 20:18:53.420899 None None None None None None hello_world
1 hello_world tutorial backfill__2015-06-02T00:00:00+00:00 -1 2025-03-22 20:18:55.898364 2025-03-22 20:18:56.036250 0.137886 success 1 0 ... None b'\x80\x05}\x94.' 2025-03-22 20:18:56.047650 None None None None None None hello_world
2 hello_world tutorial backfill__2015-06-03T00:00:00+00:00 -1 2025-03-22 20:18:58.540839 2025-03-22 20:18:58.686317 0.145478 success 1 0 ... None b'\x80\x05}\x94.' 2025-03-22 20:18:58.693254 None None None None None None hello_world
3 hello_world tutorial backfill__2015-06-04T00:00:00+00:00 -1 2025-03-22 20:19:01.298076 2025-03-22 20:19:01.443630 0.145554 success 1 0 ... None b'\x80\x05}\x94.' 2025-03-22 20:19:01.452251 None None None None None None hello_world
4 hello_world tutorial backfill__2015-06-05T00:00:00+00:00 -1 2025-03-22 20:19:04.029111 2025-03-22 20:19:04.177918 0.148807 success 1 0 ... None b'\x80\x05}\x94.' 2025-03-22 20:19:04.185538 None None None None None None hello_world

5 rows × 32 columns