Интеграция с Apache Flink 1.x
Использование интеграции OpenLineage с Apache Flink 1.x.
Требования
- Apache Flink 1.x
- OpenLineage 1.31.0 или выше, рекомендуется 1.40.1+
- Запуск брокера сообщений
- (Опционально) http2kafka
Ограничения
- Поддерживается только режим
standalone-job(режим приложения), но неjobmanager(сессионный режим): проблема OpenLineage - В настоящее время нет возможности передавать теги заданий., смотри
Сопоставление сущностей
- Задача Flink → Задание Data.Rentgen
- Запуск задачи Flink → Запуск Data.Rentgen + Операция Data.Rentgen
Установка
- Добавьте зависимости openlineage-flink и kafka-clients в вашу задачу Flink:
build.gradle
implementation "io.openlineage:openlineage-flink:1.34.0"
implementation "org.apache.kafka:kafka-clients:3.9.0"
- Зарегистрируйте
OpenLineageFlinkJobListenerв коде вашей задачи Flink:
MyFlinkJob.java
import io.openlineage.flink.OpenLineageFlinkJobListener;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
JobListener listener = OpenLineageFlinkJobListener.builder()
.executionEnvironment(env)
.build();
env.registerJobListener(listener);
Настройка
- Измените файл
config.yamlFlink, чтобы он включал:
config.yaml
execution.attached: true # захватывать события остановки задач
-
Создайте файл
openlineage.ymlс содержимым вида:openlineage.ymljob: namespace: http://some.host.name:18081 # set namespace to match Flink address name: flink_examples_stateful # set job name # Send RUNNING event every 1 hour. # Using default interval (1 minute) just floods Kafka with useless RUNNING events. trackingIntervalInSeconds: 3600 transport: type: kafka topicName: input.runs properties: bootstrap.servers: broker:9092 # not using localhost in docker security.protocol: SASL_PLAINTEXT sasl.mechanism: SCRAM-SHA-256 sasl.jaas.config: | org.apache.kafka.common.security.scram.ScramLoginModule required username="data_rentgen" password="changeme"; key.serializer: org.apache.kafka.common.serialization.StringSerializer value.serializer: org.apache.kafka.common.serialization.StringSerializer compression.type: zstd acks: allopenlineage.ymljob: # set namespace to match Flink address namespace: http://some.host.name:18081 # set job name name: flink_examples_stateful # Send RUNNING event every 1 hour. # Using default interval (1 minute) just floods Kafka with useless RUNNING events. trackingIntervalInSeconds: 3600 transport: type: http # should be accessible inside jobmanager container # not using localhost in docker! url: http://http2kafka:8000 endpoint: /v1/openlineage compression: gzip auth: type: api_key # create a PersonalToken, and pass it here apiKey: personal_token_AAAAAAAAAAAA.BBBBBBBBBBBBBBBBBBBBBBB.CCCCCCCCCCCCCCCCCCCCC -
Передайте путь к файлу конфигурации через переменную окружения
OPENLINEAGE_CONFIGдляjobmanager:
OPENLINEAGE_CONFIG=/path/to/openlineage.yml
В итоге это должно выглядеть так (см. Официальную документацию):
docker-compose.yml
services:
jobmanager:
image: flink:1.20.1-scala_2.12-java11
ports:
- "18081:8081"
# поддерживается только standalone-job
command: standalone-job --job-classname my.awesome.FlinkStatefulApplication
volumes:
- ./artifacts/:/opt/flink/usrlib/ # путь к JAR-файлам вашей задачи Flink
- ./config.yaml:/opt/flink/conf/config.yaml
- ./openlineage.yml:/opt/flink/conf/openlineage.yml
environment:
- OPENLINEAGE_CONFIG=/path/to/openlineage.yml
taskmanager:
image: flink:1.20.1-scala_2.12-java11
depends_on:
- jobmanager
command: taskmanager
volumes:
- ./artifacts/:/opt/flink/usrlib/ # путь к JAR-файлам вашей задачи Flink
- ./config.yaml:/opt/flink/conf/config.yaml
Сбор и отправка данных о происхождении данных
Просто запустите вашу задачу Flink. Интеграция OpenLineage автоматически соберет и отправит данные о происхождении в DataRentgen.
Просмотр результатов
Просмотрите страницы интерфейса Jobs, чтобы увидеть, какая информация была извлечена OpenLineage и DataRentgen.





