Connections & Hooks#
Connections is a object in ariflow that keeps the information about external system.
Hook is the implementation of the interaction with external system compatible with airflow api.
import os
os.environ["AIRFLOW_CONN_SOME_CONNECTION"] = "postgres://user:pass@host:2020/schema"
Base hook#
The airflow.sdk.bases.hook.BaseHook class is the base class for hooks, so understanding its api is crucial.
There are two important methods with really straightforward names:
[a]get_connection.get_hook.
Note the a in the begining defines the async version of the hook.
The next code shows how to apply the following methods to load the connection/hook from previously created environment variable:
from airflow.sdk.bases.hook import BaseHook
print("connection:", BaseHook.get_connection("some_connection"))
print("hook:", BaseHook.get_hook("some_connection"))
connection: Connection(conn_id='some_connection', conn_type='postgres', description=None, host='host', schema='schema', login='user', password='pass', port=2020, extra=None)
hook: <airflow.providers.postgres.hooks.postgres.PostgresHook object at 0x7daef239a660>
Connection#
The connections all follow the api defined in the airflow.sdk.definitions.connection.Connection. The basic methods are:
Fetch connection using
getclass method.The
get_hookmethod returns hook that corresponds to this connection.
The following code fetches the connection from the environment variable.
from airflow.sdk.definitions.connection import Connection
connection = Connection.get("some_connection")
connection.to_dict()
{'conn_id': 'some_connection',
'conn_type': 'postgres',
'description': None,
'host': 'host',
'login': 'user',
'password': 'pass',
'schema': 'schema',
'port': 2020,
'extra': {}}
The following cell shows the PostgresHook instance created using this connection.
connection.get_hook()
<airflow.providers.postgres.hooks.postgres.PostgresHook at 0x7daef249afd0>
Note the __init__ of the Connection will not fetch the information about the connection; it simply creates the connection with the specified attributes. The following cell shows constructing Connection from scatch:
connection = Connection(conn_id="some_connection", conn_type="value")
connection.to_dict()
{'conn_id': 'some_connection',
'conn_type': 'value',
'description': None,
'host': None,
'login': None,
'password': None,
'schema': None,
'port': None,
'extra': {}}
Environment variable#
The connection information could be stored as an environment variable with the following naming convention: AIRFLOW_CONN_<CONNECTION_ID>. It would then be accessible from the corresponding interface with the specified connection ID at the end.
The value could have different formats: json and uri.
Check:
Storing connections in environment variables for official reference.
URI section of the reference.
import os
from airflow.sdk.definitions.connection import Connection
The following cell environment variable in json format:
os.environ["AIRFLOW_CONN_JSON_CON"] = """{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}"""
Connection.get(conn_id="json_con").to_dict()
{'conn_id': 'json_con',
'conn_type': 'my_conn_type',
'description': None,
'host': 'my-host',
'login': 'my-login',
'password': 'my-password',
'schema': 'my-schema',
'port': 1234,
'extra': {'param1': 'val1', 'param2': 'val2'}}
The url as the environment variable:
os.environ["AIRFLOW_CONN_ENV"] = "conn_type://user:pasword@host:1488/schema_value?extra=data"
Connection.get(conn_id="env").to_dict()
{'conn_id': 'env',
'conn_type': '',
'description': None,
'host': '',
'login': None,
'password': None,
'schema': 'onn_type//user:pasword@host:1488/schema_value',
'port': None,
'extra': {'extra': 'data'}}