Интеграция с Apache Flink 2.x
Использование интеграции OpenLineage с Apache Flink 2.x.
Требования
- Apache Flink 2.x
- OpenLineage 1.31.0 или выше, рекомендуется 1.40.1+
- Запуск брокера сообщений
- (Опционально) http2kafka
Ограничения
- В настоящее время нет возможности передавать теги заданий, смотри
Сопоставление сущностей
- Flink job → Data.Rentgen Job
- Flink job run → Data.Rentgen Run + Data.Rentgen Operation
Установка
-
Скачайте следующие jar-файлы и поместите их в директорию
openlineage/jars/: -
KafkaTransport:
-
HttpTransport (requires HTTP2Kafka):
-
Установите переменную окружения
CLASSPATHдляJobManagerFlink, указывающую на путь к этой директории:
CLASSPATH=/path/to/openlineage/jars/
- Настройте
JobManagerFlink для загрузки этих зависимостей, используя собственный ClassLoader:
# For KafkaTransport
classloader.parent-first-patterns.additional: ["io.openlineage.", "org.apache.kafka.","com.github.luben."]
# For HttpTransport
#classloader.parent-first-patterns.additional: ["io.openlineage."]
В противном случае Flink будет загружать все классы из classloader задания, что может привести к ошибкам типа:
org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Serializer
java.util.ServiceConfigurationError: io.openlineage.client.transports.TransportBuilder: io.openlineage.client.transports.HttpTransportBuilder not a subtype
Подробнее см. документацию Flink.
Настройка
- Добавьте
OpenLineageJobStatusChangedListenerFactoryв файлconfig.yamlFlink:
# For KafkaTransport
classloader.parent-first-patterns.additional: ["io.openlineage.", "org.apache.kafka.","com.github.luben."]
# For HttpTransport
#classloader.parent-first-patterns.additional: ["io.openlineage."]
# capture job events
execution.job-status-changed-listeners: io.openlineage.flink.listener.OpenLineageJobStatusChangedListenerFactory
# capture job stop events
execution.attached: true
# set namespace to match Flink address
execution.job-listener.openlineage.namespace: http://some.host.name:18081
# set job name
execution.job-listener.openlineage.job-name: flink_examples_stateful
-
Создайте файл
openlineage.ymlсо следующим содержимым:openlineage.yml# Send RUNNING event every 1 hour. # Using default interval (1 minute) just floods Kafka with useless RUNNING events. trackingIntervalInSeconds: 600 transport: type: kafka topicName: input.runs properties: # should be accessible inside jobmanager container # not using localhost in docker! bootstrap.servers: broker:9092 security.protocol: SASL_PLAINTEXT sasl.mechanism: SCRAM-SHA-256 # Kafka auth credentials sasl.jaas.config: | org.apache.kafka.common.security.scram.ScramLoginModule required username="data_rentgen" password="changeme"; key.serializer: org.apache.kafka.common.serialization.StringSerializer value.serializer: org.apache.kafka.common.serialization.StringSerializer compression.type: zstd acks: allopenlineage.yml# Send RUNNING event every 1 hour. # Using default interval (1 minute) just floods Kafka with useless RUNNING events. trackingIntervalInSeconds: 3600 transport: type: http url: http://http2kafka:8000 # not using localhost in docker endpoint: /v1/openlineage compression: gzip auth: type: api_key # create a PersonalToken, and pass it here apiKey: personal_token_AAAAAAAAAAAA.BBBBBBBBBBBBBBBBBBBBBBB.CCCCCCCCCCCCCCCCCCCCC -
Передайте путь к файлу конфигурации через переменную окружения
OPENLINEAGE_CONFIGдляjobmanager:
OPENLINEAGE_CONFIG=/path/to/openlineage.yml
В итоге конфигурация должна выглядеть так (см. Официальную документацию):
services:
jobmanager:
image: flink:2.0.0-scala_2.12-java11
ports:
- "18081:8081"
# поддерживаются как standalone-job, так и jobmanager
command: standalone-job --job-classname my.awesome.FlinkStatefulApplication
volumes:
- ./artifacts/:/opt/flink/usrlib/ # путь к файлам .jar вашего задания Flink, если используется standalone-job
- ./config.yaml:/opt/flink/conf/config.yaml
- ./openlineage/jars/:/opt/flink/usrlib/openlineage/
- ./openlineage.yml:/opt/flink/conf/openlineage.yml
environment:
- CLASSPATH=/opt/flink/usrlib/openlineage/
taskmanager:
image: flink:2.0.0-scala_2.12-java11
depends_on:
- jobmanager
command: taskmanager
volumes:
- ./artifacts/:/opt/flink/usrlib/ # путь к файлам .jar вашего задания Flink, если используется standalone-job
- ./config.yaml:/opt/flink/conf/config.yaml
Сбор и отправка данных lineage
Просто запустите ваше задание (Job) Flink. Интеграция OpenLineage автоматически соберет и отправит данные lineage в DataRentgen.
Просмотр результатов
Просмотрите страницы фронтенда Jobs, чтобы увидеть, какая информация была извлечена OpenLineage и DataRentgen.





