Интеграция с Apache Flink 2.x
Использование интеграции OpenLineage с Apache Flink 2.x.
Требования
- Apache Flink 2.x
- OpenLineage 1.31.0 или выше, рекомендуется 1.34.0+
Сопоставление сущностей
- Flink job → Data.Rentgen Job
- Flink job run → Data.Rentgen Run + Data.Rentgen Operation
Установка
-
Скачайте следующие jar-файлы и поместите их в директорию
openlineage/jars/: - openlineage-flink
- kafka-clients
-
Установите переменную окружения
CLASSPATHдляJobManagerFlink, указывающую на путь к этой директории:
CLASSPATH=/path/to/openlineage/jars/
- Настройте
JobManagerFlink для загрузки этих зависимостей, используя собственный ClassLoader:
config.yaml
classloader.parent-first-patterns.additional: ["io.openlineage.", "org.apache.kafka.","com.github.luben."]
В противном случае Flink будет загружать все классы из classloader задания, что может привести к ошибкам типа:
org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Serializer
java.util.ServiceConfigurationError: io.openlineage.client.transports.TransportBuilder: io.openlineage.client.transports.HttpTransportBuilder not a subtype
Подробнее см. документацию Flink.
Настройка
- Добавьте
OpenLineageJobStatusChangedListenerFactoryв файлconfig.yamlFlink:
config.yaml
classloader.parent-first-patterns.additional: ["io.openlineage.", "org.apache.kafka.","com.github.luben."]
execution.job-status-changed-listeners: io.openlineage.flink.listener.OpenLineageJobStatusChangedListenerFactory # перехват событий задания
execution.attached: true # перехват событий остановки задания
execution.job-listener.openlineage.namespace: http://some.host.name:18081 # установите пространство имен, чтобы соответствовать адресу Flink
execution.job-listener.openlineage.job-name: flink_examples_stateful # установите имя задания
- Создайте файл
openlineage.ymlсо следующим содержимым:
openlineage.yml
# Отправлять событие RUNNING каждый час.
# Использование интервала по умолчанию (1 минута) просто перегружает Kafka бесполезными событиями RUNNING.
trackingIntervalInSeconds: 600
transport:
type: kafka
topicName: input.runs
properties:
bootstrap.servers: broker:9092 # не используем localhost в docker
security.protocol: SASL_PLAINTEXT
sasl.mechanism: SCRAM-SHA-256
sasl.jaas.config: |
org.apache.kafka.common.security.scram.ScramLoginModule required
username="data_rentgen"
password="changeme";
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
compression.type: zstd
acks: all
- Передайте путь к файлу конфигурации через переменную окружения
OPENLINEAGE_CONFIGдляjobmanager:
OPENLINEAGE_CONFIG=/path/to/openlineage.yml
В итоге конфигурация должна выглядеть так (см. Официальную документацию):
docker-compose.yml
services:
jobmanager:
image: flink:2.0.0-scala_2.12-java11
ports:
- "18081:8081"
# поддерживаются как standalone-job, так и jobmanager
command: standalone-job --job-classname my.awesome.FlinkStatefulApplication
volumes:
- ./artifacts/:/opt/flink/usrlib/ # путь к файлам .jar вашего задания Flink, если используется standalone-job
- ./config.yaml:/opt/flink/conf/config.yaml
- ./openlineage/jars/:/opt/flink/usrlib/openlineage/
- ./openlineage.yml:/opt/flink/conf/openlineage.yml
environment:
- CLASSPATH=/opt/flink/usrlib/openlineage/
taskmanager:
image: flink:2.0.0-scala_2.12-java11
depends_on:
- jobmanager
command: taskmanager
volumes:
- ./artifacts/:/opt/flink/usrlib/ # путь к файлам .jar вашего задания Flink, если используется standalone-job
- ./config.yaml:/opt/flink/conf/config.yaml
Сбор и отправка данных о происхождении
Просто запустите ваше задание (Job) Flink. Интеграция OpenLineage автоматически соберет и отправит данные о происхождении в DataRentgen.
Просмотр результатов
Просмотрите страницы фронтенда Jobs, чтобы увидеть, какая информация была извлечена OpenLineage и DataRentgen.





