Skip to content

Интеграция с Apache Flink 2.x

Использование интеграции OpenLineage с Apache Flink 2.x.

Требования

  • Apache Flink 2.x
  • OpenLineage 1.31.0 или выше, рекомендуется 1.34.0+

Сопоставление сущностей

  • Flink job → Data.Rentgen Job
  • Flink job run → Data.Rentgen Run + Data.Rentgen Operation

Установка

  • Скачайте следующие jar-файлы и поместите их в директорию openlineage/jars/:

  • openlineage-java

  • openlineage-flink
  • kafka-clients
  • zstd-jni

  • Установите переменную окружения CLASSPATH для JobManager Flink, указывающую на путь к этой директории:

CLASSPATH=/path/to/openlineage/jars/
  • Настройте JobManager Flink для загрузки этих зависимостей, используя собственный ClassLoader:
config.yaml
classloader.parent-first-patterns.additional: ["io.openlineage.", "org.apache.kafka.","com.github.luben."]

В противном случае Flink будет загружать все классы из classloader задания, что может привести к ошибкам типа:

org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Serializer
java.util.ServiceConfigurationError: io.openlineage.client.transports.TransportBuilder: io.openlineage.client.transports.HttpTransportBuilder not a subtype

Подробнее см. документацию Flink.

Настройка

  • Добавьте OpenLineageJobStatusChangedListenerFactory в файл config.yaml Flink:
config.yaml
classloader.parent-first-patterns.additional: ["io.openlineage.", "org.apache.kafka.","com.github.luben."]
execution.job-status-changed-listeners: io.openlineage.flink.listener.OpenLineageJobStatusChangedListenerFactory  # перехват событий задания
execution.attached: true  # перехват событий остановки задания
execution.job-listener.openlineage.namespace: http://some.host.name:18081  # установите пространство имен, чтобы соответствовать адресу Flink
execution.job-listener.openlineage.job-name: flink_examples_stateful  # установите имя задания
  • Создайте файл openlineage.yml со следующим содержимым:
openlineage.yml
# Отправлять событие RUNNING каждый час.
# Использование интервала по умолчанию (1 минута) просто перегружает Kafka бесполезными событиями RUNNING.
trackingIntervalInSeconds: 600

transport:
    type: kafka
    topicName: input.runs
    properties:
        bootstrap.servers: broker:9092  # не используем localhost в docker
        security.protocol: SASL_PLAINTEXT
        sasl.mechanism: SCRAM-SHA-256
        sasl.jaas.config: |
            org.apache.kafka.common.security.scram.ScramLoginModule required
            username="data_rentgen"
            password="changeme";
        key.serializer: org.apache.kafka.common.serialization.StringSerializer
        value.serializer: org.apache.kafka.common.serialization.StringSerializer
        compression.type: zstd
        acks: all
  • Передайте путь к файлу конфигурации через переменную окружения OPENLINEAGE_CONFIG для jobmanager:
OPENLINEAGE_CONFIG=/path/to/openlineage.yml

В итоге конфигурация должна выглядеть так (см. Официальную документацию):

docker-compose.yml
services:
    jobmanager:
        image: flink:2.0.0-scala_2.12-java11
        ports:
        - "18081:8081"
        # поддерживаются как standalone-job, так и jobmanager
        command: standalone-job --job-classname my.awesome.FlinkStatefulApplication
        volumes:
        - ./artifacts/:/opt/flink/usrlib/  # путь к файлам .jar вашего задания Flink, если используется standalone-job
        - ./config.yaml:/opt/flink/conf/config.yaml
        - ./openlineage/jars/:/opt/flink/usrlib/openlineage/
        - ./openlineage.yml:/opt/flink/conf/openlineage.yml
        environment:
        - CLASSPATH=/opt/flink/usrlib/openlineage/

    taskmanager:
        image: flink:2.0.0-scala_2.12-java11
        depends_on:
        - jobmanager
        command: taskmanager
        volumes:
        - ./artifacts/:/opt/flink/usrlib/  # путь к файлам .jar вашего задания Flink, если используется standalone-job
        - ./config.yaml:/opt/flink/conf/config.yaml

Сбор и отправка данных о происхождении

Просто запустите ваше задание (Job) Flink. Интеграция OpenLineage автоматически соберет и отправит данные о происхождении в DataRentgen.

Просмотр результатов

Просмотрите страницы фронтенда Jobs, чтобы увидеть, какая информация была извлечена OpenLineage и DataRentgen.

Страница списка заданий (Job)

список заданий (Job)

Страница сведений о задании (Job)

сведения о задании (Job)

Страница сведений о запуске (Run)

сведения о запуске (run)

Граф lineage на уровне набора данных (dataset)

dataset lineage

Граф lineage на уровне задания (Job)

Job lineage

Граф lineage на уровне запуска (Run)

run lineage