API#

This page considers the internal API of the airflow.

Operators & Tasks#

The task is the piece of logic in ariflow.

The operator is the template for the task. In fact, a task is an instance of an operator.


The following cell shows the definition of the PythonOperator.

from unittest.mock import MagicMock
from airflow.providers.standard.operators.python import PythonOperator


def process_data():
    print("hello from task")


task = PythonOperator(
    task_id="process_data",
    python_callable=process_data,
)

The execute method runs the procedure that is implemented by the task:

mock = MagicMock()
task.execute({
    "task_instance": MagicMock(),
    "dag_run": MagicMock(),
    "logical_date": "2026-06-04",
})
2026-06-04T20:19:20.685280Z [warning  ] PythonOperator.execute cannot be called outside of the Task Runner! [airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator] loc=operator.py:437
hello from task
2026-06-04T20:19:20.688651Z [info     ] Done. Returned value was: None [airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator] loc=python.py:233

DAG#

A DAG represents a set of nodes (tasks) that are related to each other by the sequence of the execution within the graph. In the ariflow it represented by the airflow.DAG class.

Check the reference of the DAG for more.


The following cell defines the simple DAG:

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator


with DAG(dag_id="learning_dag") as dag:

    start = EmptyOperator(task_id="start")
    middle = EmptyOperator(task_id="middle")
    end = EmptyOperator(task_id="end")

    start >> middle >> end

As the result the target of the context manager is the instance of the DAG:

dag
<DAG: learning_dag>

The tasks of the dag:

dag.tasks
[<Task(EmptyOperator): start>,
 <Task(EmptyOperator): middle>,
 <Task(EmptyOperator): end>]

The tasks in the topological order:

dag.topological_sort()
(<Task(EmptyOperator): start>,
 <Task(EmptyOperator): middle>,
 <Task(EmptyOperator): end>)

The starting nodes of the graph (roots) and finishing nodes (leaves).

dag.roots, dag.leaves
([<Task(EmptyOperator): start>], [<Task(EmptyOperator): end>])

DagBag#

The DagBag object in Airflow is responsible for loading DAGs. You can specify in which folder where your DAGs are located; Airflow will the “import” them and construct a Dag object for each.


The following cell creates a folder containing a simple Dag that will be loaded by the DagBag object.

!mkdir /tmp/experimental_dags
%%writefile /tmp/experimental_dags/simple_dag.py
from datetime import datetime

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator

with DAG(
    dag_id="simple_dag",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:
    task = EmptyOperator(
        task_id="task",
    )
Overwriting /tmp/experimental_dags/simple_dag.py

The process of the DAGs loading:

from airflow.dag_processing.dagbag import DagBag
dagbag = DagBag("/tmp/experimental_dags", include_examples=False)
2026-05-27T20:05:13.302650Z [info     ] Filling up the DagBag from /tmp/experimental_dags [airflow.dag_processing.dagbag.DagBag] loc=dagbag.py:469
{'simple_dag': <DAG: simple_dag>}

The dags attribute is a dictionary tha maps to the dags ids actuall dags instances.

dagbag.dags
{'simple_dag': <DAG: simple_dag>}

Connections & Hooks#

Connections is the objects that contain credentials.

Hooks are the implementations of the interaction with the internal systems in airflow.

Airflow loads connections from:

  • Environemnt variables.

  • External systems for keeping credentials.

  • Internal databaase.

Check for more detailed overview check:


The following example shows the specification of the connection named my_postgres, the hook created using this connection, and some information fetched using the created hook instance.

Note the postgres instance running needed to run this example and apache-airflow-providers-postgres package installed.

import os
from airflow.providers.postgres.hooks.postgres import PostgresHook


os.environ["AIRFLOW_CONN_MY_POSTGRES"] = (
    "postgresql://postgres:postgres@localhost:5432/postgres"
)

hook = PostgresHook(postgres_conn_id="my_postgres")

rows = hook.get_records(
    """
    select * from pg_database;
    """
)

for row in rows:
    print(row)
2026-06-07T14:13:17.136574Z [info     ] Running statement: 
    select * from pg_database;
    , parameters: None [airflow.task.hooks.airflow.providers.postgres.hooks.postgres.PostgresHook] loc=sql.py:861
2026-06-07T14:13:17.137916Z [info     ] Rows affected: 3               [airflow.task.hooks.airflow.providers.postgres.hooks.postgres.PostgresHook] loc=sql.py:879
(5, 'postgres', 10, 6, 'c', False, True, False, -1, '731', '1', 1663, 'en_US.utf8', 'en_US.utf8', None, None, '2.36', None)
(1, 'template1', 10, 6, 'c', True, True, False, -1, '731', '1', 1663, 'en_US.utf8', 'en_US.utf8', None, None, '2.36', '{=c/postgres,postgres=CTc/postgres}')
(4, 'template0', 10, 6, 'c', True, False, False, -1, '731', '1', 1663, 'en_US.utf8', 'en_US.utf8', None, None, None, '{=c/postgres,postgres=CTc/postgres}')