Интеграция с Apache Airflow
Использование интеграции OpenLineage с Apache Airflow.
Требования
- Apache Airflow 2.x или 3.x
- OpenLineage 1.19.0 или выше, рекомендуется 1.34.0+
- Интеграция OpenLineage для Airflow (см. ниже)
- Запуск брокера сообщений
- (Опционально) http2kafka
Соответствие сущностей
- Airflow DAG → Data.Rentgen Job
- Airflow DAGRun → Data.Rentgen Run
- Airflow Task → Data.Rentgen Job
- Airflow TaskInstance → Data.Rentgen Run + Data.Rentgen Operation
Установка
-
Для Airflow 2.7 или выше используйте apache-airflow-providers-openlineage 1.9.0 или выше:
$ pip install "apache-airflow-providers-openlineage>=2.6.1" "openlineage-python[kafka]>=1.40.1" zstd ...$ pip install "apache-airflow-providers-openlineage>=2.6.1" ... -
For Airflow 2.1.x-2.6.x, use OpenLineage integration for Airflow 1.19.0 or higher
$ pip install "openlineage-airflow>=1.40.1" "openlineage-python[kafka]>=1.40.1" zstd ...$ pip install "openlineage-airflow>=1.40.1" ...
Настройка
Через конфигурационный файл OpenLineage
-
Создайте файл
openlineage.ymlс содержимым:transport: type: kafka topic: input.runs config: # should be accessible from Airflow scheduler bootstrap.servers: localhost:9093 security.protocol: SASL_PLAINTEXT sasl.mechanism: SCRAM-SHA-256 # Kafka auth credentials sasl.username: data_rentgen sasl.password: changeme compression.type: zstd acks: alltransport: type: http # http2kafka URL, should be accessible from Airflow scheduler url: http://localhost:8002 endpoint: /v1/openlineage compression: gzip auth: type: api_key # create a PersonalToken, and pass it here apiKey: personal_token_AAAAAAAAAAAA.BBBBBBBBBBBBBBBBBBBBBBB.CCCCCCCCCCCCCCCCCCCCC -
Укажите путь к файлу конфигурации через переменную окружения
AIRFLOW__OPENLINEAGE__CONFIG_PATH:
AIRFLOW__OPENLINEAGE__NAMESPACE=http://airflow.hostname.fqdn:8080
AIRFLOW__OPENLINEAGE__CONFIG_PATH=/path/to/openlineage.yml
Через конфигурационный файл Airflow
Настройте интеграцию OpenLineage, используя файл конфигурации airflow.cfg:
[openlineage]
# set here address of Airflow Web UI
namespace = http://airflow.hostname.fqdn:8080
# set here Kafka connection address & credentials
transport = {"type": "kafka", "config": {"bootstrap.servers": "localhost:9093", "security.protocol":"SASL_PLAINTEXT", "sasl.mechanism": "SCRAM-SHA-256", "sasl.username": "data_rentgen", "sasl.password": "changeme","compression.type": "zstd", "acks": "all"}, "topic": "input.runs", "flush": true}
[openlineage]
# set here address of Airflow Web UI
namespace = http://airflow.hostname.fqdn:8080
# set here HTTP2Kafka url & create PersonalToken
transport = {"type": "http", "url": "http://localhost:8002", "endpoint": "/v1/openlineage", "compression": "gzip","auth": {"type": "api_key", "apiKey": "personal_token_AAAAAAAAAAAA.BBBBBBBBBBBBBBBBBBBBBBB.CCCCCCCCCCCCCCCCCCCCC"}}
Через переменные окружения Airflow
Установите переменные окружения для всех компонентов Airflow (например, через docker-compose.yml):
# set here address of Airflow Web UI
AIRFLOW__OPENLINEAGE__NAMESPACE='http://airflow.hostname.fqdn:8080'
# set here Kafka broker address & auth credentials
AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "kafka", "config": {"bootstrap.servers": "localhost:9093", "securityprotocol": "SASL_PLAINTEXT", "sasl.mechanism": "SCRAM-SHA-256", "sasl.username": "data_rentgen", "sasl.password":"changeme", "compression. type": "zstd", "acks": "all"}, "topic": "input.runs", "flush": true}'
# set here address of Airflow Web UI
AIRFLOW__OPENLINEAGE__NAMESPACE='http://airflow.hostname.fqdn:8080'
# set here HTTP2Kafka url & create PersonalToken
AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:8002", "endpoint": "/v1/openlineage","compression": "gzip", "auth": {"type": "api_key", "apiKey": "personal_token_AAAAAAAAAAAA.BBBBBBBBBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCCCCCCC"}}'
Airflow 2.1.x и 2.2.x
Для Airflow 2.1-2.2 необходимо явно включить интеграцию OpenLineage, добавив запись в конфигурацию airflow.cfg:
[lineage]
backend=openlineage.lineage_backend.OpenLineageBackend
Или установив переменную окружения:
AIRFLOW__LINEAGE__BACKEND=openlineage.lineage_backend.OpenLineageBackend
Сбор и отправка данных lineage
Запустите любой DAG Airflow с задачами и дождитесь завершения. Данные о происхождении будут автоматически отправлены в Data.Rentgen с помощью интеграции OpenLineage.
Просмотр результатов
Перейдите на страницу Jobs в интерфейсе, чтобы увидеть, какая информация была извлечена OpenLineage и DataRentgen.
Страница списка заданий (Job)
Страница с детализацией информации по заданиям
Страница с детализацией информации по запускам
Страница с подробной информацией о задании
страница с подробными сведениями о запуске задачи
Lineage уровня заданий
Зависимости от заданий
Дополнительная конфигурация
Сбор меток DAG
По умолчанию создаются следующие теги заданий:
airflow.versionopenlineage_adapter.versionopenlineage_client.version(используйте OpenLineage client 1.38.0+)
Возможно ли также указать пользовательские теги для DAG? DataRentgen может извлекать теги только в формате ключ:значение, например:
from airflow.models import DAG
with DAG(
dag_id="mydag",
tags=["environment:production", "layer:bronze"],
)






