Tasks#
Tasks are a basic execution unit in Airflow. This page focuses on details related to working with tasks.
Communication#
In Airflow, setting up communication between tasks in a DAG is crucial. Airflow provides a special mechanism for this called XCom (short for ‘Cross-Communication’). XCom is a system for passing small amounts of data between tasks by serializing Python objects and storing them in the metadata database. This allows tasks to share information and enables Airflow to track inter-task communication.
Check more:
XComs page.
Passing arbitrary objects as argumemnts section of the TaskFlow description.
As an example, consider passing arguments in the modern Airflow syntax - task flow. You just have to work with tasks just like with python functions, all stuff associated related to XCom is implemented by the task
decorator.
The following cell takes the string from echo
and saves it to the disk the save
task.
%%writefile /opt/airflow/dags/tutorial.py
from pathlib import Path
from airflow.decorators import dag, task
import pendulum
now = pendulum.now("UTC")
@dag("etl")
def etl(start_date=now, schedule="@daily", catchup=False):
@task
def echo() -> str:
return "message"
@task
def save(temps: str) -> None:
Path("/tmp/airflow_data").write_text(temps)
data = echo()
save(data)
etl()
Overwriting /opt/airflow/dags/tutorial.py
The following cell runs dag and displays the contents of the /tmp/airflow_data
cell.
%%bash
airflow db migrate &> /dev/null
airflow tasks clear -s 2022-01-01 -e 2022-01-01 -y etl
airflow dags backfill -s 2022-01-01 -e 2022-01-01 etl
[2025-03-23T13:00:57.099+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags
[2025-03-23T13:00:59.928+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags
[2025-03-23T13:01:00.220+0000] {executor_loader.py:258} INFO - Loaded executor: SequentialExecutor
[2025-03-23T13:01:00.287+0000] {base_executor.py:169} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'etl', 'echo', 'backfill__2022-01-01T00:00:00+00:00', '--depends-on-past', 'ignore', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/tutorial.py', '--cfg-path', '/tmp/tmp99vnbjxg']
[2025-03-23T13:01:00.293+0000] {sequential_executor.py:84} INFO - Executing command: ['airflow', 'tasks', 'run', 'etl', 'echo', 'backfill__2022-01-01T00:00:00+00:00', '--depends-on-past', 'ignore', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/tutorial.py', '--cfg-path', '/tmp/tmp99vnbjxg']
[2025-03-23T13:01:02.332+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags/tutorial.py
[2025-03-23T13:01:02.564+0000] {task_command.py:467} INFO - Running <TaskInstance: etl.echo backfill__2022-01-01T00:00:00+00:00 [queued]> on host db44a0789e04
[2025-03-23T13:01:03.312+0000] {backfill_job_runner.py:464} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2025-03-23T13:01:04.373+0000] {base_executor.py:169} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'etl', 'save', 'backfill__2022-01-01T00:00:00+00:00', '--depends-on-past', 'ignore', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/tutorial.py', '--cfg-path', '/tmp/tmpr556dona']
[2025-03-23T13:01:04.374+0000] {sequential_executor.py:84} INFO - Executing command: ['airflow', 'tasks', 'run', 'etl', 'save', 'backfill__2022-01-01T00:00:00+00:00', '--depends-on-past', 'ignore', '--local', '--pool', 'default_pool', '--subdir', 'DAGS_FOLDER/tutorial.py', '--cfg-path', '/tmp/tmpr556dona']
[2025-03-23T13:01:06.432+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags/tutorial.py
[2025-03-23T13:01:06.675+0000] {task_command.py:467} INFO - Running <TaskInstance: etl.save backfill__2022-01-01T00:00:00+00:00 [queued]> on host db44a0789e04
[2025-03-23T13:01:07.410+0000] {backfill_job_runner.py:303} WARNING - TaskInstanceKey(dag_id='etl', task_id='save', run_id='backfill__2022-01-01T00:00:00+00:00', try_number=3, map_index=-1) state success not in running=dict_values([<TaskInstance: etl.save backfill__2022-01-01T00:00:00+00:00 [queued]>])
[2025-03-23T13:01:07.415+0000] {dagrun.py:854} INFO - Marking run <DagRun etl @ 2022-01-01 00:00:00+00:00: backfill__2022-01-01T00:00:00+00:00, state:running, queued_at: 2025-03-23 13:00:57.326310+00:00. externally triggered: False> successful
[2025-03-23T13:01:07.415+0000] {dagrun.py:905} INFO - DagRun Finished: dag_id=etl, execution_date=2022-01-01 00:00:00+00:00, run_id=backfill__2022-01-01T00:00:00+00:00, run_start_date=2025-03-23 13:01:00.225884+00:00, run_end_date=2025-03-23 13:01:07.415813+00:00, run_duration=7.189929, state=success, external_trigger=False, run_type=backfill, data_interval_start=2022-01-01 00:00:00+00:00, data_interval_end=2022-01-02 00:00:00+00:00, dag_hash=None
[2025-03-23T13:01:07.416+0000] {backfill_job_runner.py:464} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2025-03-23T13:01:08.425+0000] {backfill_job_runner.py:1051} INFO - Backfill done for DAG <DAG: etl>. Exiting.
!cat /tmp/airflow_data
message
As a result, the exact string generated by the echo
task was stored.