Airflow#
Airflow is a tool that allows to schedule a set of processes commonly used in ETL pipelines and sometimes in ML automation. This section considers typical ways to use Airflow.
We typically are typically run airflow in the Docker container to make sure that we are working in a clean environment. Use following command to run the container.
Build the docker image described in the airflow_files/dockerfile
and run it with the standalone
command.
docker build -f packages/airflow_files/dockerfile .
docker run -d --rm --name airflow -p 8080:8080 -v ./:/knowledge airflow standalone
Image that is used as an example configured in such way to create default user with login user
and password user
that will be used as credentials for the airflow server.
Configuration file#
The global configuration of the airflow is stored in the special file: airflow.cfg
. Different installations put this file in different places (as usual). Typical locations are /opt/airflow/airflow.cfg
and ~/airflow/airflow.cfg
.
Any way use following command to find the location of the airflow.cfg
on your disk.
!find / -name airflow.cfg
/opt/airflow/airflow.cfg
DAG#
DAG stands for Direct Acyclic Graph. And it’s a collection of tasks and determines relationships between them.
As a basic example, consider a procedure that adds dag to Airflow.
First you need to identify the folder containing dags. This folder is specified by the dogs_folder
parameter of the configuration file.
!cat /opt/airflow/airflow.cfg | grep dags_folder
dags_folder = /opt/airflow/dags
DAG must be implemented by the special DAG file. This is the file that contains airflow.models.DAG
object. It takes a lot of settings that determine it’s behavior but in general it needs only dag_id
to be specified. The following cell defines a dag with id tutorial
.
%%writefile /opt/airflow/dags/tutorial.py
from airflow.models.dag import DAG
with DAG("tutorial") as dag:
pass
Writing /opt/airflow/dags/tutorial.py
The following command causes airflow to add DAG to its databases.
!airflow db migrate
DB: sqlite:////opt/airflow/airflow.db
Performing upgrade to the metadata database sqlite:////opt/airflow/airflow.db
[2025-03-22T14:14:21.698+0000] {migration.py:207} INFO - Context impl SQLiteImpl.
[2025-03-22T14:14:21.700+0000] {migration.py:210} INFO - Will assume non-transactional DDL.
[2025-03-22T14:14:21.704+0000] {migration.py:207} INFO - Context impl SQLiteImpl.
[2025-03-22T14:14:21.704+0000] {migration.py:210} INFO - Will assume non-transactional DDL.
[2025-03-22T14:14:21.705+0000] {db.py:1675} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
Database migrating done!
With airflow dags list
you can show DAGs that are seen by the airflow.
!airflow dags list
dag_id | fileloc | owners | is_paused
=========+===============================+========+==========
tutorial | /opt/airflow/dags/tutorial.py | | True
As a result there is a dag we have added below.
Task flow#
There is an alternative way to define DAG - to use TaskFlow syntax. You need to cover the function that will arc as a DAG with ariflow.decorators.dag
decorator and run result function.
Check Working with TaskFlow for more details.
The following cell defines the DAG in the Task Flow syntax.
%%writefile /opt/airflow/dags/tutorial.py
from airflow.decorators import dag
@dag("taskflow_example")
def tutorial():
pass
tutorial()
Overwriting /opt/airflow/dags/tutorial.py
The following cell reloads the airflow database - as a result the dag_id
corresponds to the parameter specified in the ariflow.decorators.dag
.
%%bash
airflow db migrate &> /dev/null
airflow dags list
dag_id | fileloc | owners | is_paused
=================+===============================+========+==========
taskflow_example | /opt/airflow/dags/tutorial.py | | True
Tasks#
Task is a basic execution unit in Airflow. Task can be created from:
Operator: preset template for creating a task.
Sensor: special kind of operator that tiggers on some event.
TaskFlow syntax: you just have to wrap a normal python function into
airflow.decorators.task
.
Here is the list of the common airflow operators:
Operator |
Description |
---|---|
|
Executes a Python function. |
|
Runs a Bash command or script. |
|
Executes a SQL query in a PostgreSQL database. |
|
Executes a SQL query in a MySQL database. |
|
Executes a SQL query in a SQLite database. |
|
Executes a SQL query in an MS SQL Server database. |
|
Executes a query in Snowflake. |
|
Executes a SQL query in Google BigQuery. |
|
Executes a HiveQL query in Apache Hive. |
|
Transfers and transforms files in AWS S3. |
|
Sends an email. |
|
Sends an HTTP request. |
|
A placeholder operator that does nothing. |
|
A lightweight no-op operator, replacing |
|
Chooses the next task dynamically based on a Python function. |
|
Triggers another DAG. |
|
Runs a task inside a Kubernetes pod. |
Find out more:
As example consider basic tools to handle the tasks in mlflow.
The following code defines DAG with bash
task and loads it to the airflow.
%%writefile /opt/airflow/dags/tutorial.py
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
with DAG("tutorial") as dag:
BashOperator(
task_id="hello_world",
bash_command="echo 'Hello world'"
)
Overwriting /opt/airflow/dags/tutorial.py
!airflow db migrate &> /dev/null
List the tasks of the DAG with airflow tasks list <name of the dug>
.
!airflow tasks list tutorial
hello_world
Run tasks for testing purposes using airflow tasks test <name of dag> <name of task>
!airflow tasks test tutorial hello_world
[2025-03-22T14:47:49.566+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags
[2025-03-22T14:47:49.698+0000] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: tutorial.hello_world __airflow_temporary_run_2025-03-22T14:47:49.584901+00:00__ [None]>
[2025-03-22T14:47:49.702+0000] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: tutorial.hello_world __airflow_temporary_run_2025-03-22T14:47:49.584901+00:00__ [None]>
[2025-03-22T14:47:49.702+0000] {taskinstance.py:2867} INFO - Starting attempt 0 of 1
[2025-03-22T14:47:49.703+0000] {taskinstance.py:2948} WARNING - cannot record queued_duration for task hello_world because previous state change time has not been saved
[2025-03-22T14:47:49.703+0000] {taskinstance.py:2890} INFO - Executing <Task(BashOperator): hello_world> on 2025-03-22 14:47:49.584873+00:00
[2025-03-22T14:47:50.022+0000] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='tutorial' AIRFLOW_CTX_TASK_ID='hello_world' AIRFLOW_CTX_EXECUTION_DATE='2025-03-22T14:47:49.584873+00:00' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2025-03-22T14:47:49.584901+00:00__'
[2025-03-22T14:47:50.025+0000] {taskinstance.py:732} INFO - ::endgroup::
[2025-03-22T14:47:50.040+0000] {subprocess.py:78} INFO - Tmp dir root location: /tmp
[2025-03-22T14:47:50.041+0000] {subprocess.py:88} INFO - Running command: ['/usr/bin/bash', '-c', "echo 'Hello world'"]
[2025-03-22T14:47:50.048+0000] {subprocess.py:99} INFO - Output:
[2025-03-22T14:47:50.049+0000] {subprocess.py:106} INFO - Hello world
[2025-03-22T14:47:50.050+0000] {subprocess.py:110} INFO - Command exited with return code 0
[2025-03-22T14:47:50.065+0000] {taskinstance.py:341} INFO - ::group::Post task execution logs
[2025-03-22T14:47:50.065+0000] {taskinstance.py:353} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=hello_world, run_id=__airflow_temporary_run_2025-03-22T14:47:49.584901+00:00__, execution_date=20250322T144749, start_date=, end_date=20250322T144750
The outputs contain a lot of logging information, but now only the line after “Output” is important. The next cell highlights it.
!airflow tasks test tutorial hello_world | grep "Output" -A 1
[2025-03-22T14:48:00.890+0000] {subprocess.py:99} INFO - Output:
[2025-03-22T14:48:00.891+0000] {subprocess.py:106} INFO - Hello world
Database#
Airflow keeps all its data in a special database. It can be useful to be able to access this data from the formal interface. Different databases can be used, but the default and simpliest option is a sqlite
.
The following cell represents some of the popular tables that are available in the SQLite database.
Table Name |
Description |
---|---|
|
Stores metadata about DAGs. |
|
Stores metadata about DAG runs (instances of DAG executions). |
|
Stores metadata about task instances (individual task executions). |
|
Stores logs for task instances. |
|
Stores metadata about jobs, such as the scheduler and backfill jobs. |
|
Stores XCom (cross-communication) data for tasks. |
|
Stores Airflow variables. |
|
Stores Airflow connections. |
|
Stores information about task failures. |
|
Stores information about SLA misses. |
The following cell shows how to access the task_isntance
table from python script.
import sqlite3
import pandas as pd
con = sqlite3.connect("/opt/airflow/airflow.db")
pd.read_sql_query("SELECT * FROM task_instance LIMIT 5;", con)
task_id | dag_id | run_id | map_index | start_date | end_date | duration | state | try_number | max_tries | ... | executor | executor_config | updated_at | rendered_map_index | external_executor_id | trigger_id | trigger_timeout | next_method | next_kwargs | task_display_name | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | hello_world | tutorial | backfill__2015-06-01T00:00:00+00:00 | -1 | 2025-03-22 20:18:53.268787 | 2025-03-22 20:18:53.409019 | 0.140232 | success | 1 | 0 | ... | None | b'\x80\x05}\x94.' | 2025-03-22 20:18:53.420899 | None | None | None | None | None | None | hello_world |
1 | hello_world | tutorial | backfill__2015-06-02T00:00:00+00:00 | -1 | 2025-03-22 20:18:55.898364 | 2025-03-22 20:18:56.036250 | 0.137886 | success | 1 | 0 | ... | None | b'\x80\x05}\x94.' | 2025-03-22 20:18:56.047650 | None | None | None | None | None | None | hello_world |
2 | hello_world | tutorial | backfill__2015-06-03T00:00:00+00:00 | -1 | 2025-03-22 20:18:58.540839 | 2025-03-22 20:18:58.686317 | 0.145478 | success | 1 | 0 | ... | None | b'\x80\x05}\x94.' | 2025-03-22 20:18:58.693254 | None | None | None | None | None | None | hello_world |
3 | hello_world | tutorial | backfill__2015-06-04T00:00:00+00:00 | -1 | 2025-03-22 20:19:01.298076 | 2025-03-22 20:19:01.443630 | 0.145554 | success | 1 | 0 | ... | None | b'\x80\x05}\x94.' | 2025-03-22 20:19:01.452251 | None | None | None | None | None | None | hello_world |
4 | hello_world | tutorial | backfill__2015-06-05T00:00:00+00:00 | -1 | 2025-03-22 20:19:04.029111 | 2025-03-22 20:19:04.177918 | 0.148807 | success | 1 | 0 | ... | None | b'\x80\x05}\x94.' | 2025-03-22 20:19:04.185538 | None | None | None | None | None | None | hello_world |
5 rows × 32 columns