0.10.0 (2023-12-18)
Критические изменения
- Обновление
etl-entitiesс v1 до v2 (#172).
Это означает, что классы HWM теперь имеют другую внутреннюю структуру, чем раньше.
До:
from etl_entities.old_hwm import IntHWM as OldIntHWM
from etl_entities.source import Column, Table
from etl_entities.process import Process
hwm = OldIntHWM(
process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
source=Table(name="schema.table", instance="postgres://host:5432/db"),
column=Column(name="col1"),
value=123,
)
После:
from etl_entities.hwm import ColumnIntHWM
hwm = ColumnIntHWM(
name="some_unique_name",
description="any value you want",
source="schema.table",
expression="col1",
value=123,
)
Критическое изменение: Если вы использовали классы HWM из модуля etl_entities, вам следует переписать свой код, чтобы сделать его совместимым с новой версией.
Подробнее
- Классы
HWM, используемые предыдущими версиями onETL, были перемещены изetl_entitiesв подмодульetl_entities.old_hwm. Они сохранены для обеспечения совместимости, но будут удалены в выпускеetl-entitiesv3. - Новые классы
HWMимеют плоскую структуру вместо вложенной. - Новые классы
HWMимеют обязательный атрибутname(ранее известный какqualified_name). - Типы псевдонимов, используемые при сериализации и десериализации объектов
HWMв представленииdict, также были изменены:int→column_int.
Чтобы упростить миграцию, вы можете использовать новый метод:
old_hwm = OldIntHWM(...)
new_hwm = old_hwm.as_new_hwm()
Он автоматически преобразует все поля из старой структуры в новую, включая qualified_name → name.
-
Критические изменения:
-
Методы
BaseHWMStore.get()иBaseHWMStore.save()были переименованы вget_hwm()иset_hwm(). - Теперь их можно использовать только с новыми классами HWM из
etl_entities.hwm, старые классы HWM не поддерживаются.
Если вы использовали их в своем коде, пожалуйста, обновите его соответствующим образом.
- YAMLHWMStore НЕ МОЖЕТ читать файлы, созданные более старыми версиями onETL (0.9.x или старше).
Процедура обновления
# pip install onetl==0.9.5
# Get qualified_name for HWM
# Option 1. HWM is built manually
from etl_entities import IntHWM, FileListHWM
from etl_entities.source import Column, Table, RemoteFolder
from etl_entities.process import Process
# for column HWM
old_column_hwm = IntHWM(
process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
source=Table(name="schema.table", instance="postgres://host:5432/db"),
column=Column(name="col1"),
)
qualified_name = old_column_hwm.qualified_name
# "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost"
# for file HWM
old_file_hwm = FileListHWM(
process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
source=RemoteFolder(name="/absolute/path", instance="ftp://ftp.server:21"),
)
qualified_name = old_file_hwm.qualified_name
# "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost"
# Option 2. HWM is generated automatically (by DBReader/FileDownloader)
# See onETL logs and search for string like qualified_name = '...'
qualified_name = "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost"
# Get .yml file path by qualified_name
import os
from pathlib import PurePosixPath
from onetl.hwm.store import YAMLHWMStore
# here you should pass the same arguments as used on production, if any
yaml_hwm_store = YAMLHWMStore()
hwm_path = yaml_hwm_store.get_file_path(qualified_name)
print(hwm_path)
# for column HWM
# LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/col1__schema.table__postgres_host_5432_db__cde.abc.myprocess__myhost.yml')
# for file HWM
# LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/file_list__absolute_path__ftp_ftp.server_21__cde.abc.myprocess__myhost.yml')
# Read raw .yml file content
from yaml import safe_load, dump
raw_old_hwm_items = safe_load(hwm_path.read_text())
print(raw_old_hwm_items)
# for column HWM
# [
# {
# "column": { "name": "col1", "partition": {} },
# "modified_time": "2023-12-18T10: 39: 47.377378",
# "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" },
# "source": { "instance": "postgres: //host:5432/db", "name": "schema.table" },
# "type": "int",
# "value": "123",
# },
# ]
# for file HWM
# [
# {
# "modified_time": "2023-12-18T11:15:36.478462",
# "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" },
# "source": { "instance": "ftp://ftp.server:21", "name": "/absolute/path" },
# "type": "file_list",
# "value": ["file1.txt", "file2.txt"],
# },
# ]
# Convert file content to new structure, compatible with onETL 0.10.x
raw_new_hwm_items = []
for old_hwm in raw_old_hwm_items:
new_hwm = {"name": qualified_name, "modified_time": old_hwm["modified_time"]}
if "column" in old_hwm:
new_hwm["expression"] = old_hwm["column"]["name"]
new_hwm["entity"] = old_hwm["source"]["name"]
old_hwm.pop("process", None)
if old_hwm["type"] == "int":
new_hwm["type"] = "column_int"
new_hwm["value"] = old_hwm["value"]
elif old_hwm["type"] == "date":
new_hwm["type"] = "column_date"
new_hwm["value"] = old_hwm["value"]
elif old_hwm["type"] == "datetime":
new_hwm["type"] = "column_datetime"
new_hwm["value"] = old_hwm["value"]
elif old_hwm["type"] == "file_list":
new_hwm["type"] = "file_list"
new_hwm["value"] = [
os.fspath(PurePosixPath(old_hwm["source"]["name"]).joinpath(path))
for path in old_hwm["value"]
]
else:
raise ValueError("WAT?")
raw_new_hwm_items.append(new_hwm)
print(raw_new_hwm_items)
# for column HWM
# [
# {
# "name": "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost",
# "modified_time": "2023-12-18T10:39:47.377378",
# "expression": "col1",
# "source": "schema.table",
# "type": "column_int",
# "value": 123,
# },
# ]
# for file HWM
# [
# {
# "name": "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost",
# "modified_time": "2023-12-18T11:15:36.478462",
# "entity": "/absolute/path",
# "type": "file_list",
# "value": ["/absolute/path/file1.txt", "/absolute/path/file2.txt"],
# },
# ]
# Save file with new content
with open(hwm_path, "w") as file:
dump(raw_new_hwm_items, file)
# Stop Python interpreter and update onETL
# pip install onetl==0.10.0
# Check that new .yml file can be read
from onetl.hwm.store import YAMLHWMStore
qualified_name = ...
# here you should pass the same arguments as used on production, if any
yaml_hwm_store = YAMLHWMStore()
yaml_hwm_store.get_hwm(qualified_name)
# for column HWM
# ColumnIntHWM(
# name='col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost',
# description='',
# entity='schema.table',
# value=123,
# expression='col1',
# modified_time=datetime.datetime(2023, 12, 18, 10, 39, 47, 377378),
# )
# for file HWM
# FileListHWM(
# name='file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost',
# description='',
# entity=AbsolutePath('/absolute/path'),
# value=frozenset({AbsolutePath('/absolute/path/file1.txt'), AbsolutePath('/absolute/path/file2.txt')}),
# expression=None,
# modified_time=datetime.datetime(2023, 12, 18, 11, 15, 36, 478462)
# )
# That's all!
Большинство пользователей используют другие реализации хранилища HWM, для которых указанная процедура не требуется.
- Некоторые классы и функции были перемещены из
onetlвetl_entities:
from onetl.hwm.store import (
detect_hwm_store,
BaseHWMStore,
HWMStoreClassRegistry,
register_hwm_store_class,
HWMStoreManager,
MemoryHWMStore,
)
from etl_entities.hwm_store import (
detect_hwm_store,
BaseHWMStore,
HWMStoreClassRegistry,
register_hwm_store_class,
HWMStoreManager,
MemoryHWMStore,
)
Их все еще можно импортировать из старого модуля, но они устарели и будут удалены в выпуске v1.0.0.
- Изменился способ передачи
HWMклассамDBReaderиFileDownloader:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | |
Новые классы HWM имеют обязательный атрибут name, который должен быть передан явно, вместо автоматической генерации.
Автоматическая генерация name с использованием старого синтаксиса DBReader.hwm_column / FileDownloader.hwm_type все еще поддерживается, но будет удалена в выпуске v1.0.0. (#179)
- Значительно улучшена производительность стратегий чтения Incremental и Batch. (#182).
Подробнее о том, что было до и после
DBReader.run() + поведение инкрементной/пакетной стратегии в версиях 0.9.x и старше:
- Получить схему таблицы, выполнив запрос
SELECT * FROM table WHERE 1=0(еслиDBReader.columnsсодержит*) - Развернуть
*в реальные имена столбцов из таблицы, добавить сюдаhwm_column, удалить дубликаты (поскольку некоторые СУБД этого не позволяют). - Создать dataframe из запроса, подобного
SELECT hwm_expression AS hwm_column, ...other table columns... FROM table WHERE hwm_expression > prev_hwm.value. - Определить класс HWM, используя схему dataframe:
df.schema[hwm_column].dataType. - Определить значение столбца x HWM, используя Spark:
df.select(max(hwm_column)).collect(). - Использовать
max(hwm_column)в качестве следующего значения HWM и сохранить его в HWM Store. - Вернуть dataframe пользователю.
Это было далеко от идеала:
-
Содержимое Dataframe (все строки или только измененные) загружалось из источника в Spark только для получения минимальных/максимальных значений определенного столбца.
-
Шаг получения схемы таблицы, а затем подстановки имен столбцов в следующем запросе вызывал некоторые неожиданные ошибки.
Например, если источник содержит столбцы со смешанным регистром имен, например
"CamelColumn"или"spaced column".Имена столбцов не экранировались во время генерации запроса, что приводило к запросам, которые не могли быть выполнены базой данных.
Поэтому пользователи должны были явно передавать имена столбцов
DBReader, заключая столбцы со смешанным регистром именования в":reader = DBReader( connection=..., source=..., columns=[ # passing '*' here leads to wrong SQL query generation "normal_column", '"CamelColumn"', '"spaced column"', ..., ], ) -
Использование
DBReaderсIncrementalStrategyмогло привести к чтению строк, уже прочитанных ранее.Dataframe был создан из запроса с предложением WHERE, подобным
hwm.expression > prev_hwm.value, а неhwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value.Поэтому, если в источнике появились новые строки после определения значения HWM, они могли быть прочитаны при доступе к содержимому dataframe (поскольку Spark использует "ленивые" вычисления), и это могло привести к несоответствиям между значением HWM и содержимого dataframe.
Что впоследствии могло привести к проблемам, в случае если
DBReader.run()прочитает некоторые данные, обновит значение HWM, а следующий вызовDBReader.run()прочитает строки, которые уже были прочитаны в предыдущем запуске.
DBReader.run() + поведение инкрементной/пакетной стратегии в версиях 0.10.x и новее:
- Определить тип выражения HWM:
SELECT hwm.expression FROM table WHERE 1=0. - Определить соответствующий тип Spark
df.schema[0]и затем определить соответствующий класс HWM (если используетсяDBReader.AutoDetectHWM). - Получить минимальные/максимальные значения, запросив источник:
SELECT MAX(hwm.expression) FROM table WHERE hwm.expression >= prev_hwm.value. - Использовать
max(hwm.expression)в качестве следующего значения HWM и сохранить его в HWM Store. - Создать dataframe из запроса
SELECT ... table columns ... FROM table WHERE hwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value, внедрив новое значение HWM в запрос. - Вернуть dataframe пользователю.
Улучшения:
- Разрешение источнику вычислять min/max вместо загрузки всего набора данных в Spark. Это должно быть быстрее на больших объемах данных (до x2), потому что мы не передаем все данные из источника в Spark. Это может быть даже еще быстрее, если источник имеет индексы для столбца HWM.
- Список столбцов передается в источник как есть, без какой-либо обработки на стороне
DBReader. Таким образом, вы можете передатьDBReader(columns=["*"])для чтения таблиц со смешанным именованием столбцов. - Ограничение содержимого dataframe, чтобы оно всегда соответствовало значениям HWM, гарантирующее, что одна и та же строка никогда не будет прочитана дважды.
Критическое изменение: Столбец HWM больше не добавляется неявно в dataframe. Раньше он был частью предложения SELECT, но теперь он упоминается только в предложении WHERE.
Поэтому, если у вас был код, подобный этому, вам придется его переписать:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | |
Но поскольку большинство пользователей все равно используют columns=["*"], они не увидят никаких изменений.
FileDownloader.run()теперь обновляет HWM в HWM Store после обработки всех файлов, а не после загрузки каждого отдельного файла.
Причины изменений:
- FileDownloader можно использовать с
DownloadOptions(workers=N), что может привести к состоянию гонки - один поток может сохранить в HWM store одно значение HWM, а другой -- другое. - FileDownloader может загружать сотни и даже тысячи файлов, и отправка запроса в HWM Store для каждого файла может потенциально привести к DDoS HWM Store. (#189)
Обработчик исключений пытается сохранить HWM в HWM store, если процесс загрузки был прерван. Но если он был прерван принудительно, например, отправкой события SIGKILL, HWM не будет сохранен в HWM store, и тогда некоторые уже загруженные файлы могут быть загружены снова.
Кроме того, неожиданное завершение процесса может привести к другим негативным последствиям, например, некоторые файлы будут загружены частично, поэтому мы считаем такую реализацию ожидаемым поведением.
Функциональность
- Добавлена совместимость с Python 3.12. (#167)
- Формат файлов
Excelтеперь можно использовать с Spark 3.5.0. (#187) SnapshotBatchStagyиIncrementalBatchStrategyне вызывают исключений, если источник не содержит данных. Вместо этого они останавливаются на первой итерации и возвращают пустой dataframe. (#188)- Добавлено кэширование результата
connection.check()в классах высокого уровня, таких какDBReader,FileDownloaderи т. д. Это устраняет избыточную информативность логов. (#190)
Исправление ошибок
- Исправлены декораторы
@slotи@hook, возвращающие методы с отсутствующими аргументами в сигнатуре (Pylance, VS Code). (#183) - В документации Kafka connector говорилось, что он поддерживает инкрементное чтение данных топика путем передачи
group.idилиgroupIdPrefix. На самом деле это не так, потому что Spark не отправляет в Kafka информацию о том, какие сообщения были получены. Поэтому в настоящее время пользователи могут читать весь топик целиком, а инкрементное чтение не поддерживается.