Skip to content

Интеграция с Apache Airflow

Использование интеграции OpenLineage с Apache Airflow.

Требования

Соответствие сущностей

  • Airflow DAG → Data.Rentgen Job
  • Airflow DAGRun → Data.Rentgen Run
  • Airflow Task → Data.Rentgen Job
  • Airflow TaskInstance → Data.Rentgen Run + Data.Rentgen Operation

Установка

  • Для Airflow 2.7 или выше используйте apache-airflow-providers-openlineage 1.9.0 или выше:

    $ pip install "apache-airflow-providers-openlineage>=2.6.1" "openlineage-python[kafka]>=1.40.1" zstd
    ...
    
    $ pip install "apache-airflow-providers-openlineage>=2.6.1"
    ...
    
  • For Airflow 2.1.x-2.6.x, use OpenLineage integration for Airflow 1.19.0 or higher

    $ pip install "openlineage-airflow>=1.40.1" "openlineage-python[kafka]>=1.40.1" zstd
    ...
    
    $ pip install "openlineage-airflow>=1.40.1"
    ...
    

Настройка

Через конфигурационный файл OpenLineage

  • Создайте файл openlineage.yml с содержимым:

    transport:
        type: kafka
        topic: input.runs
        config:
            # should be accessible from Airflow scheduler
            bootstrap.servers: localhost:9093
            security.protocol: SASL_PLAINTEXT
            sasl.mechanism: SCRAM-SHA-256
            # Kafka auth credentials
            sasl.username: data_rentgen
            sasl.password: changeme
            compression.type: zstd
            acks: all
    
    transport:
      type: http
       # http2kafka URL, should be accessible from Airflow scheduler
       url: http://localhost:8002
       endpoint: /v1/openlineage
       compression: gzip
       auth:
        type: api_key
        # create a PersonalToken, and pass it here
        apiKey: personal_token_AAAAAAAAAAAA.BBBBBBBBBBBBBBBBBBBBBBB.CCCCCCCCCCCCCCCCCCCCC
    
  • Укажите путь к файлу конфигурации через переменную окружения AIRFLOW__OPENLINEAGE__CONFIG_PATH:

AIRFLOW__OPENLINEAGE__NAMESPACE=http://airflow.hostname.fqdn:8080
AIRFLOW__OPENLINEAGE__CONFIG_PATH=/path/to/openlineage.yml

Через конфигурационный файл Airflow

Настройте интеграцию OpenLineage, используя файл конфигурации airflow.cfg:

[openlineage]
# set here address of Airflow Web UI
namespace = http://airflow.hostname.fqdn:8080
# set here Kafka connection address & credentials
transport = {"type": "kafka", "config": {"bootstrap.servers": "localhost:9093", "security.protocol":"SASL_PLAINTEXT", "sasl.mechanism": "SCRAM-SHA-256", "sasl.username": "data_rentgen", "sasl.password": "changeme","compression.type": "zstd", "acks":   "all"},   "topic": "input.runs", "flush": true}
[openlineage]
# set here address of Airflow Web UI
namespace = http://airflow.hostname.fqdn:8080
# set here HTTP2Kafka url & create PersonalToken
transport = {"type": "http", "url": "http://localhost:8002", "endpoint": "/v1/openlineage", "compression": "gzip","auth": {"type": "api_key", "apiKey": "personal_token_AAAAAAAAAAAA.BBBBBBBBBBBBBBBBBBBBBBB.CCCCCCCCCCCCCCCCCCCCC"}}

Через переменные окружения Airflow

Установите переменные окружения для всех компонентов Airflow (например, через docker-compose.yml):

# set here address of Airflow Web UI
AIRFLOW__OPENLINEAGE__NAMESPACE='http://airflow.hostname.fqdn:8080'
# set here Kafka broker address & auth credentials
AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "kafka", "config": {"bootstrap.servers": "localhost:9093", "securityprotocol": "SASL_PLAINTEXT", "sasl.mechanism": "SCRAM-SHA-256", "sasl.username": "data_rentgen", "sasl.password":"changeme", "compression.  type":   "zstd", "acks": "all"}, "topic": "input.runs", "flush": true}'
# set here address of Airflow Web UI
AIRFLOW__OPENLINEAGE__NAMESPACE='http://airflow.hostname.fqdn:8080'
# set here HTTP2Kafka url & create PersonalToken
AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:8002", "endpoint": "/v1/openlineage","compression": "gzip", "auth": {"type": "api_key", "apiKey": "personal_token_AAAAAAAAAAAA.BBBBBBBBBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCCCCCCC"}}'

Airflow 2.1.x и 2.2.x

Для Airflow 2.1-2.2 необходимо явно включить интеграцию OpenLineage, добавив запись в конфигурацию airflow.cfg:

[lineage]
backend=openlineage.lineage_backend.OpenLineageBackend

Или установив переменную окружения:

AIRFLOW__LINEAGE__BACKEND=openlineage.lineage_backend.OpenLineageBackend

Сбор и отправка данных lineage

Запустите любой DAG Airflow с задачами и дождитесь завершения. Данные о происхождении будут автоматически отправлены в Data.Rentgen с помощью интеграции OpenLineage.

Просмотр результатов

Перейдите на страницу Jobs в интерфейсе, чтобы увидеть, какая информация была извлечена OpenLineage и DataRentgen.

Страница списка заданий (Job)

список задач (Job)

Страница с детализацией информации по заданиям

dag_job_details

Страница с детализацией информации по запускам

dag run details

Страница с подробной информацией о задании

task_job_details

страница с подробными сведениями о запуске задачи

task run details

Lineage уровня заданий

job_lineage

Зависимости от заданий

job_hierarchy

Дополнительная конфигурация

Сбор меток DAG

По умолчанию создаются следующие теги заданий:

  • airflow.version
  • openlineage_adapter.version
  • openlineage_client.version (используйте OpenLineage client 1.38.0+)

Возможно ли также указать пользовательские теги для DAG? DataRentgen может извлекать теги только в формате ключ:значение, например:

mydag.py
from airflow.models import DAG

with DAG(
  dag_id="mydag",
  tags=["environment:production", "layer:bronze"],
)