Tasks

Contents

Tasks#

Tasks are a basic execution unit in Airflow. This page focuses on details related to working with tasks.

Communication#

In Airflow, setting up communication between tasks in a DAG is crucial. Airflow provides a special mechanism for this called XCom (short for ‘Cross-Communication’). XCom is a system for passing small amounts of data between tasks by serializing Python objects and storing them in the metadata database. This allows tasks to share information and enables Airflow to track inter-task communication.

Check more:


As an example, consider passing arguments in the modern Airflow syntax - task flow. You just have to work with tasks just like with python functions, all stuff associated related to XCom is implemented by the task decorator.

The following cell takes the string from echo and saves it to the disk the save task.

%%writefile /opt/airflow/dags/tutorial.py
from pathlib import Path
from airflow.decorators import dag, task
import pendulum

now = pendulum.now("UTC")

@dag("etl")
def etl(start_date=now, schedule="@daily", catchup=False):
    @task
    def echo() -> str:
        return "message"

    @task
    def save(temps: str) -> None:
        Path("/tmp/airflow_data").write_text(temps)

    data = echo()
    save(data) 

etl()
Overwriting /opt/airflow/dags/tutorial.py

The following cell runs dag and displays the contents of the /tmp/airflow_data cell.

%%bash
airflow db migrate &> /dev/null
airflow tasks clear -s 2022-01-01 -e 2022-01-01 -y etl
airflow dags backfill -s 2022-01-01 -e 2022-01-01 etl
[2025-03-23T13:00:57.099+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags
[2025-03-23T13:00:59.928+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags
[2025-03-23T13:01:00.220+0000] {executor_loader.py:258} INFO - Loaded executor: SequentialExecutor
[2025-03-23T13:01:00.287+0000] {base_executor.py:169} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'etl', 'echo', 'backfill__2022-01-01T00:00:00+00:00', '--depends-on-past', 'ignore', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/tutorial.py', '--cfg-path', '/tmp/tmp99vnbjxg']
[2025-03-23T13:01:00.293+0000] {sequential_executor.py:84} INFO - Executing command: ['airflow', 'tasks', 'run', 'etl', 'echo', 'backfill__2022-01-01T00:00:00+00:00', '--depends-on-past', 'ignore', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/tutorial.py', '--cfg-path', '/tmp/tmp99vnbjxg']
[2025-03-23T13:01:02.332+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags/tutorial.py
[2025-03-23T13:01:02.564+0000] {task_command.py:467} INFO - Running <TaskInstance: etl.echo backfill__2022-01-01T00:00:00+00:00 [queued]> on host db44a0789e04
[2025-03-23T13:01:03.312+0000] {backfill_job_runner.py:464} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2025-03-23T13:01:04.373+0000] {base_executor.py:169} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'etl', 'save', 'backfill__2022-01-01T00:00:00+00:00', '--depends-on-past', 'ignore', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/tutorial.py', '--cfg-path', '/tmp/tmpr556dona']
[2025-03-23T13:01:04.374+0000] {sequential_executor.py:84} INFO - Executing command: ['airflow', 'tasks', 'run', 'etl', 'save', 'backfill__2022-01-01T00:00:00+00:00', '--depends-on-past', 'ignore', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/tutorial.py', '--cfg-path', '/tmp/tmpr556dona']
[2025-03-23T13:01:06.432+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags/tutorial.py
[2025-03-23T13:01:06.675+0000] {task_command.py:467} INFO - Running <TaskInstance: etl.save backfill__2022-01-01T00:00:00+00:00 [queued]> on host db44a0789e04
[2025-03-23T13:01:07.410+0000] {backfill_job_runner.py:303} WARNING - TaskInstanceKey(dag_id='etl', task_id='save', run_id='backfill__2022-01-01T00:00:00+00:00', try_number=3, map_index=-1) state success not in running=dict_values([<TaskInstance: etl.save backfill__2022-01-01T00:00:00+00:00 [queued]>])
[2025-03-23T13:01:07.415+0000] {dagrun.py:854} INFO - Marking run <DagRun etl @ 2022-01-01 00:00:00+00:00: backfill__2022-01-01T00:00:00+00:00, state:running, queued_at: 2025-03-23 13:00:57.326310+00:00. externally triggered: False> successful
[2025-03-23T13:01:07.415+0000] {dagrun.py:905} INFO - DagRun Finished: dag_id=etl, execution_date=2022-01-01 00:00:00+00:00, run_id=backfill__2022-01-01T00:00:00+00:00, run_start_date=2025-03-23 13:01:00.225884+00:00, run_end_date=2025-03-23 13:01:07.415813+00:00, run_duration=7.189929, state=success, external_trigger=False, run_type=backfill, data_interval_start=2022-01-01 00:00:00+00:00, data_interval_end=2022-01-02 00:00:00+00:00, dag_hash=None
[2025-03-23T13:01:07.416+0000] {backfill_job_runner.py:464} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2025-03-23T13:01:08.425+0000] {backfill_job_runner.py:1051} INFO - Backfill done for DAG <DAG: etl>. Exiting.
!cat /tmp/airflow_data
message

As a result, the exact string generated by the echo task was stored.