Skip to content

Интеграция с Apache Flink 2.x

Использование интеграции OpenLineage с Apache Flink 2.x.

Требования

Ограничения

  • В настоящее время нет возможности передавать теги заданий, смотри

Сопоставление сущностей

  • Flink job → Data.Rentgen Job
  • Flink job run → Data.Rentgen Run + Data.Rentgen Operation

Установка

  • Скачайте следующие jar-файлы и поместите их в директорию openlineage/jars/:

  • KafkaTransport:

  • HttpTransport (requires HTTP2Kafka):

  • Установите переменную окружения CLASSPATH для JobManager Flink, указывающую на путь к этой директории:

CLASSPATH=/path/to/openlineage/jars/
  • Настройте JobManager Flink для загрузки этих зависимостей, используя собственный ClassLoader:
config.yaml
# For KafkaTransport
classloader.parent-first-patterns.additional: ["io.openlineage.", "org.apache.kafka.","com.github.luben."]
# For HttpTransport
#classloader.parent-first-patterns.additional: ["io.openlineage."]

В противном случае Flink будет загружать все классы из classloader задания, что может привести к ошибкам типа:

org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Serializer
java.util.ServiceConfigurationError: io.openlineage.client.transports.TransportBuilder: io.openlineage.client.transports.HttpTransportBuilder not a subtype

Подробнее см. документацию Flink.

Настройка

  • Добавьте OpenLineageJobStatusChangedListenerFactory в файл config.yaml Flink:
config.yaml
   # For KafkaTransport
classloader.parent-first-patterns.additional: ["io.openlineage.", "org.apache.kafka.","com.github.luben."]
   # For HttpTransport
   #classloader.parent-first-patterns.additional: ["io.openlineage."]

   # capture job events
execution.job-status-changed-listeners: io.openlineage.flink.listener.OpenLineageJobStatusChangedListenerFactory
   # capture job stop events
execution.attached: true
   # set namespace to match Flink address
execution.job-listener.openlineage.namespace: http://some.host.name:18081
   # set job name
execution.job-listener.openlineage.job-name: flink_examples_stateful
  • Создайте файл openlineage.yml со следующим содержимым:

    openlineage.yml
       # Send RUNNING event every 1 hour.
       # Using default interval (1 minute) just floods Kafka with useless RUNNING events.
    trackingIntervalInSeconds: 600
    transport:
        type: kafka
        topicName: input.runs
        properties:
            # should be accessible inside jobmanager container
            # not using localhost in docker!
            bootstrap.servers: broker:9092
            security.protocol: SASL_PLAINTEXT
            sasl.mechanism: SCRAM-SHA-256
            # Kafka auth credentials
            sasl.jaas.config: |
              org.apache.kafka.common.security.scram.ScramLoginModule required
              username="data_rentgen"
              password="changeme";
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: org.apache.kafka.common.serialization.StringSerializer
            compression.type: zstd
            acks: all
    
    openlineage.yml
       # Send RUNNING event every 1 hour.
       # Using default interval (1 minute) just floods Kafka with useless RUNNING events.
       trackingIntervalInSeconds: 3600
       transport:
           type: http
           url: http://http2kafka:8000  # not using localhost in docker
           endpoint: /v1/openlineage
           compression: gzip
           auth:
               type: api_key
               # create a PersonalToken, and pass it here
               apiKey: personal_token_AAAAAAAAAAAA.BBBBBBBBBBBBBBBBBBBBBBB.CCCCCCCCCCCCCCCCCCCCC
    
  • Передайте путь к файлу конфигурации через переменную окружения OPENLINEAGE_CONFIG для jobmanager:

OPENLINEAGE_CONFIG=/path/to/openlineage.yml

В итоге конфигурация должна выглядеть так (см. Официальную документацию):

docker-compose.yml
services:
    jobmanager:
        image: flink:2.0.0-scala_2.12-java11
        ports:
        - "18081:8081"
        # поддерживаются как standalone-job, так и jobmanager
        command: standalone-job --job-classname my.awesome.FlinkStatefulApplication
        volumes:
        - ./artifacts/:/opt/flink/usrlib/  # путь к файлам .jar вашего задания Flink, если используется standalone-job
        - ./config.yaml:/opt/flink/conf/config.yaml
        - ./openlineage/jars/:/opt/flink/usrlib/openlineage/
        - ./openlineage.yml:/opt/flink/conf/openlineage.yml
        environment:
        - CLASSPATH=/opt/flink/usrlib/openlineage/

    taskmanager:
        image: flink:2.0.0-scala_2.12-java11
        depends_on:
        - jobmanager
        command: taskmanager
        volumes:
        - ./artifacts/:/opt/flink/usrlib/  # путь к файлам .jar вашего задания Flink, если используется standalone-job
        - ./config.yaml:/opt/flink/conf/config.yaml

Сбор и отправка данных lineage

Просто запустите ваше задание (Job) Flink. Интеграция OpenLineage автоматически соберет и отправит данные lineage в DataRentgen.

Просмотр результатов

Просмотрите страницы фронтенда Jobs, чтобы увидеть, какая информация была извлечена OpenLineage и DataRentgen.

Страница списка заданий (Job)

список заданий (Job)

Страница сведений о задании (Job)

сведения о задании (Job)

Страница сведений о запуске (Run)

сведения о запуске (run)

Граф lineage на уровне набора данных (dataset)

dataset lineage

Граф lineage на уровне задания (Job)

Job lineage

Граф lineage на уровне запуска (Run)

run lineage