Tutorial DAG

Tutorial DAG#

Using this jupyter notebook you can run DAG from this example.

DAG code#

The following is the DAG code that we just copied from the example into the prepared folder.

%%writefile tutorial_dag/tutorial_dug.py

from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]
Overwriting tutorial_dag/tutorial_dug.py

Container#

By running the following cell you start an instance of ariflow with the prepared example dug in.

!docker run -itd --rm\
    --name tutorial_dag\
    -p 8080:8080\
    -v ./tutorial_dag:/root/airflow/dags\
    airflow_tests &> /dev/null

Let’s check that the tutorial dag has been added to the airflow.

!docker exec tutorial_dag airflow dags list
dag_id   | filepath        | owner   | paused
=========+=================+=========+=======
tutorial | tutorial_dug.py | airflow | True  
                                             

Don’t forget to stop the container when you have finished playing with the example.

!docker stop tutorial_dag &> /dev/null

Test task#

By using the command airflow tasks test you can execute the task and get it’s output just in terminal. Like in the example below.

None For some unknown reason, the first execution of this command causes a Python error. This shouldn’t affect us for now - the logs are fine, but I need to look into it more.

%%bash
echo "=====test====="
docker exec tutorial_dag airflow tasks test tutorial print_date 2015-06-01
=====test=====
[2023-09-24T13:00:55.874+0000] {dagbag.py:539} INFO - Filling up the DagBag from /root/airflow/dags
[2023-09-24T13:00:55.986+0000] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: tutorial.print_date __airflow_temporary_run_2023-09-24T13:00:44.709050+00:00__ [None]>
[2023-09-24T13:00:55.990+0000] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: tutorial.print_date __airflow_temporary_run_2023-09-24T13:00:44.709050+00:00__ [None]>
[2023-09-24T13:00:55.990+0000] {taskinstance.py:1359} INFO - Starting attempt 1 of 2
[2023-09-24T13:00:55.990+0000] {taskinstance.py:1428} WARNING - cannot record queued_duration for task print_date because previous state change time has not been saved
[2023-09-24T13:00:55.991+0000] {taskinstance.py:1380} INFO - Executing <Task(BashOperator): print_date> on 2015-06-01 00:00:00+00:00
[2023-09-24T13:00:56.017+0000] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='airflow@example.com' AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='tutorial' AIRFLOW_CTX_TASK_ID='print_date' AIRFLOW_CTX_EXECUTION_DATE='2015-06-01T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2023-09-24T13:00:44.709050+00:00__'
[2023-09-24T13:00:56.019+0000] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2023-09-24T13:00:56.020+0000] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'date']
[2023-09-24T13:00:56.027+0000] {subprocess.py:86} INFO - Output:
[2023-09-24T13:00:56.028+0000] {subprocess.py:93} INFO - Sun Sep 24 13:00:56 UTC 2023
[2023-09-24T13:00:56.028+0000] {subprocess.py:97} INFO - Command exited with return code 0
[2023-09-24T13:00:56.043+0000] {taskinstance.py:1398} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=print_date, execution_date=20150601T000000, start_date=, end_date=20230924T130056