Connections & Hooks

Connections & Hooks#

Connections is a object in ariflow that keeps the information about external system.

Hook is the implementation of the interaction with external system compatible with airflow api.

import os

os.environ["AIRFLOW_CONN_SOME_CONNECTION"] = "postgres://user:pass@host:2020/schema"

Base hook#

The airflow.sdk.bases.hook.BaseHook class is the base class for hooks, so understanding its api is crucial.

There are two important methods with really straightforward names:

  • [a]get_connection.

  • get_hook.

Note the a in the begining defines the async version of the hook.


The next code shows how to apply the following methods to load the connection/hook from previously created environment variable:

from airflow.sdk.bases.hook import BaseHook
print("connection:", BaseHook.get_connection("some_connection"))
print("hook:", BaseHook.get_hook("some_connection"))
connection: Connection(conn_id='some_connection', conn_type='postgres', description=None, host='host', schema='schema', login='user', password='pass', port=2020, extra=None)
hook: <airflow.providers.postgres.hooks.postgres.PostgresHook object at 0x7daef239a660>

Connection#

The connections all follow the api defined in the airflow.sdk.definitions.connection.Connection. The basic methods are:

  • Fetch connection using get class method.

  • The get_hook method returns hook that corresponds to this connection.


The following code fetches the connection from the environment variable.

from airflow.sdk.definitions.connection import Connection

connection = Connection.get("some_connection")
connection.to_dict()
{'conn_id': 'some_connection',
 'conn_type': 'postgres',
 'description': None,
 'host': 'host',
 'login': 'user',
 'password': 'pass',
 'schema': 'schema',
 'port': 2020,
 'extra': {}}

The following cell shows the PostgresHook instance created using this connection.

connection.get_hook()
<airflow.providers.postgres.hooks.postgres.PostgresHook at 0x7daef249afd0>

Note the __init__ of the Connection will not fetch the information about the connection; it simply creates the connection with the specified attributes. The following cell shows constructing Connection from scatch:

connection = Connection(conn_id="some_connection", conn_type="value")
connection.to_dict()
{'conn_id': 'some_connection',
 'conn_type': 'value',
 'description': None,
 'host': None,
 'login': None,
 'password': None,
 'schema': None,
 'port': None,
 'extra': {}}

Environment variable#

The connection information could be stored as an environment variable with the following naming convention: AIRFLOW_CONN_<CONNECTION_ID>. It would then be accessible from the corresponding interface with the specified connection ID at the end.

The value could have different formats: json and uri.

Check:


import os
from airflow.sdk.definitions.connection import Connection

The following cell environment variable in json format:

os.environ["AIRFLOW_CONN_JSON_CON"] = """{
    "conn_type": "my-conn-type",
    "login": "my-login",
    "password": "my-password",
    "host": "my-host",
    "port": 1234,
    "schema": "my-schema",
    "extra": {
        "param1": "val1",
        "param2": "val2"
    }
}"""

Connection.get(conn_id="json_con").to_dict()
{'conn_id': 'json_con',
 'conn_type': 'my_conn_type',
 'description': None,
 'host': 'my-host',
 'login': 'my-login',
 'password': 'my-password',
 'schema': 'my-schema',
 'port': 1234,
 'extra': {'param1': 'val1', 'param2': 'val2'}}

The url as the environment variable:

os.environ["AIRFLOW_CONN_ENV"] = "conn_type://user:pasword@host:1488/schema_value?extra=data"

Connection.get(conn_id="env").to_dict()
{'conn_id': 'env',
 'conn_type': '',
 'description': None,
 'host': '',
 'login': None,
 'password': None,
 'schema': 'onn_type//user:pasword@host:1488/schema_value',
 'port': None,
 'extra': {'extra': 'data'}}