0.3.0 (2025-07-04)
Новые возможности
- Улучшена поддержка
openlineage-airflow(#210).
Раньше мы отслеживали только события начала/окончания DAG и Task, но не отслеживали lineage
Теперь мы сохраняем lineage, созданный операторами Airflow, такими как SQLExecuteQueryOperator.
- Добавлена поддержка openlineage-flink (#214).
- Добавлена поддержка openlineage-hive (#245).
- Добавлена поддержка openlineage-dbt (#223).
- Добавлена грануляция DATASET для GET /api/datasets/lineage (#235).
- Сохранение SQL-запросов, полученных от интеграций OpenLineage. (#213, #218).
Критические изменения
- Изменение
Output.typeв ответеGET /api/*/lineageсEnumнаList[Enum](#222).
Примеры ENUM в ответе lineage
=== До
{
"nodes": {...},
"relations": {
"outputs": [
{
"from": {"kind": "JOB", "id": 3981},
"to": {"kind": "DATASET", "id": 8400},
"types": "OVERWRITE", # <---
...
]
},
}
=== После
{
"nodes": {...},
"relations": {
"outputs": [
{
"from": {"kind": "JOB", "id": 3981},
"to": {"kind": "DATASET", "id": 8400},
"types": ["OVERWRITE", "DROP", "TRUNCATE"], # <---
...
]
},
}
Мы используем схему выходных данных, если она есть, иначе используем схему входных данных.
- Перемещение
Input.schemaиOutput.schemaвDataset.schemaв ответеGET /api/*/lineage(#249).
Примеры схемы в ответе lineage
=== До
{
"nodes": {
"datasets": {
"8400": {
"id": "8400",
"location": {...},
"name": "dataset_name",
...
}
},
"relations": {
"outputs": [
{
"from": {"kind": "JOB", "id": 3981},
"to": {"kind": "DATASET", "id": 8400},
"types": "OVERWRITE",
"schema": { # <---
"id": "10062",
"fields": [ ... ],
"relevance_type": "EXACT_MATCH"
]
]
},
}
=== После
{
"nodes": {
"datasets": {
"8400": {
"id": "25896",
"location": {...},
"name": "dataset_name",
"schema": { # <---
"id": "10062",
"fields": [...],
"relevance_type": "EXACT_MATCH"
},
...
}
}
...
},
"relations": {
"outputs": [
{
"from": {"kind": "JOB", "id": 3981},
"to": {"kind": "DATASET", "id": 8400},
"types": ["OVERWRITE", "DROP", "TRUNCATE"],
...
]
},
}
Улучшения
- Добавлен скрипт
cleanup_partitions.pyдля автоматической очистки старых партиций таблиц (#254). - Добавлен скрипт
data_rentgen.db.seed, который создаёт примеры данных в базе данных (#257). - Ускорено извлечение
RunиOperationиз базы данных по ID (#247). - Ускорено получение событий OpenLineage из Kafka (#236).
- Сделана более надежной обработка сообщений в консьюмере (#204).
Ранее некорректно сформированные события OpenLineage (JSON) приводили к пропуску всего пакета сообщений, считанного из Kafka.
Теперь сообщения обрабатываются по отдельности, а некорректные отправляются обратно в Kafka-топик input.runs__malformed.
- Улучшено хранение данных о происхождении для долгих операций (#253).
Описание
Ранее, если операция выполнялась долго (более дня, стриминговые задачи Flink могут легко работать месяцами или годами), и граф lineage строился за последний день, задача/запуск/операция Flink отсутствовали в графе.
Это происходило потому, что мы создавали записи о входных/выходных данных и столбцах при старте операции,
а события RUNNING той же операции (контрольные точки) просто обновляли статистику в той же строке.
Теперь мы создаем новые записи о входных/выходных данных и столбцах также и для событий контрольных точек. Но только одну запись за каждый час с момента запуска операции, так как увеличение количества строк замедляет построение графа происхождения.
Для короткоживущих операций (большинство пакетных операций занимает менее часа) поведение остается неизменным.
Исправления ошибок
- Исправлен шаблон URL для Airflow 3.x DAG и Task (#227).