API#
This page considers the internal API of the airflow.
Operators & Tasks#
The task is the piece of logic in ariflow.
The operator is the template for the task. In fact, a task is an instance of an operator.
The following cell shows the definition of the PythonOperator.
from unittest.mock import MagicMock
from airflow.providers.standard.operators.python import PythonOperator
def process_data():
print("hello from task")
task = PythonOperator(
task_id="process_data",
python_callable=process_data,
)
The execute method runs the procedure that is implemented by the task:
mock = MagicMock()
task.execute({
"task_instance": MagicMock(),
"dag_run": MagicMock(),
"logical_date": "2026-06-04",
})
2026-06-04T20:19:20.685280Z [warning ] PythonOperator.execute cannot be called outside of the Task Runner! [airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator] loc=operator.py:437
hello from task
2026-06-04T20:19:20.688651Z [info ] Done. Returned value was: None [airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator] loc=python.py:233
DAG#
A DAG represents a set of nodes (tasks) that are related to each other by the sequence of the execution within the graph. In the ariflow it represented by the airflow.DAG class.
Check the reference of the DAG for more.
The following cell defines the simple DAG:
from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
with DAG(dag_id="learning_dag") as dag:
start = EmptyOperator(task_id="start")
middle = EmptyOperator(task_id="middle")
end = EmptyOperator(task_id="end")
start >> middle >> end
As the result the target of the context manager is the instance of the DAG:
dag
<DAG: learning_dag>
The tasks of the dag:
dag.tasks
[<Task(EmptyOperator): start>,
<Task(EmptyOperator): middle>,
<Task(EmptyOperator): end>]
The tasks in the topological order:
dag.topological_sort()
(<Task(EmptyOperator): start>,
<Task(EmptyOperator): middle>,
<Task(EmptyOperator): end>)
The starting nodes of the graph (roots) and finishing nodes (leaves).
dag.roots, dag.leaves
([<Task(EmptyOperator): start>], [<Task(EmptyOperator): end>])
DagBag#
The DagBag object in Airflow is responsible for loading DAGs. You can specify in which folder where your DAGs are located; Airflow will the “import” them and construct a Dag object for each.
The following cell creates a folder containing a simple Dag that will be loaded by the DagBag object.
!mkdir /tmp/experimental_dags
%%writefile /tmp/experimental_dags/simple_dag.py
from datetime import datetime
from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
with DAG(
dag_id="simple_dag",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
) as dag:
task = EmptyOperator(
task_id="task",
)
Overwriting /tmp/experimental_dags/simple_dag.py
The process of the DAGs loading:
from airflow.dag_processing.dagbag import DagBag
dagbag = DagBag("/tmp/experimental_dags", include_examples=False)
2026-05-27T20:05:13.302650Z [info ] Filling up the DagBag from /tmp/experimental_dags [airflow.dag_processing.dagbag.DagBag] loc=dagbag.py:469
{'simple_dag': <DAG: simple_dag>}
The dags attribute is a dictionary tha maps to the dags ids actuall dags instances.
dagbag.dags
{'simple_dag': <DAG: simple_dag>}
Connections & Hooks#
Connections is the objects that contain credentials.
Hooks are the implementations of the interaction with the internal systems in airflow.
Airflow loads connections from:
Environemnt variables.
External systems for keeping credentials.
Internal databaase.
Check for more detailed overview check:
Managing Connections page in official documentation.
Connections & Hooks page.
The following example shows the specification of the connection named my_postgres, the hook created using this connection, and some information fetched using the created hook instance.
Note the postgres instance running needed to run this example and apache-airflow-providers-postgres package installed.
import os
from airflow.providers.postgres.hooks.postgres import PostgresHook
os.environ["AIRFLOW_CONN_MY_POSTGRES"] = (
"postgresql://postgres:postgres@localhost:5432/postgres"
)
hook = PostgresHook(postgres_conn_id="my_postgres")
rows = hook.get_records(
"""
select * from pg_database;
"""
)
for row in rows:
print(row)
2026-06-07T14:13:17.136574Z [info ] Running statement:
select * from pg_database;
, parameters: None [airflow.task.hooks.airflow.providers.postgres.hooks.postgres.PostgresHook] loc=sql.py:861
2026-06-07T14:13:17.137916Z [info ] Rows affected: 3 [airflow.task.hooks.airflow.providers.postgres.hooks.postgres.PostgresHook] loc=sql.py:879
(5, 'postgres', 10, 6, 'c', False, True, False, -1, '731', '1', 1663, 'en_US.utf8', 'en_US.utf8', None, None, '2.36', None)
(1, 'template1', 10, 6, 'c', True, True, False, -1, '731', '1', 1663, 'en_US.utf8', 'en_US.utf8', None, None, '2.36', '{=c/postgres,postgres=CTc/postgres}')
(4, 'template0', 10, 6, 'c', True, False, False, -1, '731', '1', 1663, 'en_US.utf8', 'en_US.utf8', None, None, None, '{=c/postgres,postgres=CTc/postgres}')