Интеграция с Apache Airflow
Использование интеграции OpenLineage с Apache Airflow.
Требования
- Apache Airflow 2.x или 3.x
- OpenLineage 1.19.0 или выше, рекомендуется 1.34.0+
- Интеграция OpenLineage для Airflow (см. ниже)
Соответствие сущностей
- Airflow DAG → Data.Rentgen Job
- Airflow DAGRun → Data.Rentgen Run
- Airflow Task → Data.Rentgen Job
- Airflow TaskInstance → Data.Rentgen Run + Data.Rentgen Operation
Установка
- Для Airflow 2.7 или выше используйте apache-airflow-providers-openlineage 1.9.0 или выше:
$ pip install "apache-airflow-providers-openlineage>=2.3.0" "openlineage-python[kafka]>=1.34.0" zstd
...
- Для Airflow 2.1.x-2.6.x используйте интеграцию OpenLineage для Airflow 1.19.0 или выше:
$ pip install "openlineage-airflow>=1.34.0" "openlineage-python[kafka]>=1.34.0" zstd
...
Настройка
Через конфигурационный файл OpenLineage
- Создайте файл
openlineage.ymlс содержимым:
transport:
type: kafka
topic: input.runs
config:
bootstrap.servers: localhost:9093
security.protocol: SASL_PLAINTEXT
sasl.mechanism: SCRAM-SHA-256
sasl.username: data_rentgen
sasl.password: changeme
compression.type: zstd
acks: all
- Укажите путь к файлу конфигурации через переменную окружения
AIRFLOW__OPENLINEAGE__CONFIG_PATH:
AIRFLOW__OPENLINEAGE__NAMESPACE=http://airflow.hostname.fqdn:8080
AIRFLOW__OPENLINEAGE__CONFIG_PATH=/path/to/openlineage.yml
Через конфигурационный файл Airflow
Настройте интеграцию OpenLineage, используя файл конфигурации airflow.cfg:
[openlineage]
# укажите здесь адрес веб-интерфейса Airflow
namespace = http://airflow.hostname.fqdn:8080
# укажите здесь адрес подключения к Kafka и учетные данные
transport = {"type": "kafka", "config": {"bootstrap.servers": "localhost:9093", "security.protocol": "SASL_PLAINTEXT", "sasl.mechanism": "SCRAM-SHA-256", "sasl.username": "data_rentgen", "sasl.password": "changeme", "compression.type": "zstd", "acks": "all"}, "topic": "input.runs", "flush": true}
Через переменные окружения Airflow
Установите переменные окружения для всех компонентов Airflow (например, через docker-compose.yml):
AIRFLOW__OPENLINEAGE__NAMESPACE=http://airflow.hostname.fqdn:8080
AIRFLOW__OPENLINEAGE__TRANSPORT={"type": "kafka", "config": {"bootstrap.servers": "localhost:9093", "security.protocol": "SASL_PLAINTEXT", "sasl.mechanism": "SCRAM-SHA-256", "sasl.username": "data_rentgen", "sasl.password": "changeme", "compression.type": "zstd", "acks": "all"}, "topic": "input.runs", "flush": true}
Airflow 2.1.x и 2.2.x
Для Airflow 2.1-2.2 необходимо явно включить интеграцию OpenLineage, добавив запись в конфигурацию airflow.cfg:
[lineage]
backend=openlineage.lineage_backend.OpenLineageBackend
Или установив переменную окружения:
AIRFLOW__LINEAGE__BACKEND=openlineage.lineage_backend.OpenLineageBackend
Сбор и отправка данных lineage
Запустите любой DAG Airflow с задачами и дождитесь завершения. Данные о происхождении будут автоматически отправлены в Data.Rentgen с помощью интеграции OpenLineage.
Просмотр результатов
Перейдите на страницу Jobs в интерфейсе, чтобы увидеть, какая информация была извлечена OpenLineage и DataRentgen.


