Skip to content

0.10.0 (2023-12-18)

Критические изменения

  • Обновление etl-entities с v1 до v2 (#172).

Это означает, что классы HWM теперь имеют другую внутреннюю структуру, чем раньше.

До:

from etl_entities.old_hwm import IntHWM as OldIntHWM
from etl_entities.source import Column, Table
from etl_entities.process import Process

hwm = OldIntHWM(
    process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
    source=Table(name="schema.table", instance="postgres://host:5432/db"),
    column=Column(name="col1"),
    value=123,
)

После:

from etl_entities.hwm import ColumnIntHWM

hwm = ColumnIntHWM(
    name="some_unique_name",
    description="any value you want",
    source="schema.table",
    expression="col1",
    value=123,
)

Критическое изменение: Если вы использовали классы HWM из модуля etl_entities, вам следует переписать свой код, чтобы сделать его совместимым с новой версией.

Подробнее
  • Классы HWM, используемые предыдущими версиями onETL, были перемещены из etl_entities в подмодуль etl_entities.old_hwm. Они сохранены для обеспечения совместимости, но будут удалены в выпуске etl-entities v3.
  • Новые классы HWM имеют плоскую структуру вместо вложенной.
  • Новые классы HWM имеют обязательный атрибут name (ранее известный как qualified_name).
  • Типы псевдонимов, используемые при сериализации и десериализации объектов HWM в представлении dict, также были изменены: intcolumn_int.

Чтобы упростить миграцию, вы можете использовать новый метод:

old_hwm = OldIntHWM(...)
new_hwm = old_hwm.as_new_hwm()

Он автоматически преобразует все поля из старой структуры в новую, включая qualified_namename.

  • Критические изменения:

  • Методы BaseHWMStore.get() и BaseHWMStore.save() были переименованы в get_hwm() и set_hwm().

  • Теперь их можно использовать только с новыми классами HWM из etl_entities.hwm, старые классы HWM не поддерживаются.

Если вы использовали их в своем коде, пожалуйста, обновите его соответствующим образом.

  • YAMLHWMStore НЕ МОЖЕТ читать файлы, созданные более старыми версиями onETL (0.9.x или старше).
Процедура обновления
# pip install onetl==0.9.5

# Get qualified_name for HWM


# Option 1. HWM is built manually
from etl_entities import IntHWM, FileListHWM
from etl_entities.source import Column, Table, RemoteFolder
from etl_entities.process import Process

# for column HWM
old_column_hwm = IntHWM(
    process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
    source=Table(name="schema.table", instance="postgres://host:5432/db"),
    column=Column(name="col1"),
)
qualified_name = old_column_hwm.qualified_name
# "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost"

# for file HWM
old_file_hwm = FileListHWM(
    process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
    source=RemoteFolder(name="/absolute/path", instance="ftp://ftp.server:21"),
)
qualified_name = old_file_hwm.qualified_name
# "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost"


# Option 2. HWM is generated automatically (by DBReader/FileDownloader)
# See onETL logs and search for string like qualified_name = '...'

qualified_name = "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost"


# Get .yml file path by qualified_name

import os
from pathlib import PurePosixPath
from onetl.hwm.store import YAMLHWMStore

# here you should pass the same arguments as used on production, if any
yaml_hwm_store = YAMLHWMStore()
hwm_path = yaml_hwm_store.get_file_path(qualified_name)
print(hwm_path)

# for column HWM
# LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/col1__schema.table__postgres_host_5432_db__cde.abc.myprocess__myhost.yml')

# for file HWM
# LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/file_list__absolute_path__ftp_ftp.server_21__cde.abc.myprocess__myhost.yml')


# Read raw .yml file content

from yaml import safe_load, dump

raw_old_hwm_items = safe_load(hwm_path.read_text())
print(raw_old_hwm_items)

# for column HWM
# [
#   {
#     "column": { "name": "col1", "partition": {} },
#     "modified_time": "2023-12-18T10: 39: 47.377378",
#     "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" },
#     "source": { "instance": "postgres: //host:5432/db", "name": "schema.table" },
#     "type": "int",
#     "value": "123",
#   },
# ]

# for file HWM
# [
#   {
#     "modified_time": "2023-12-18T11:15:36.478462",
#     "process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" },
#     "source": { "instance": "ftp://ftp.server:21", "name": "/absolute/path" },
#     "type": "file_list",
#     "value": ["file1.txt", "file2.txt"],
#   },
# ]


# Convert file content to new structure, compatible with onETL 0.10.x
raw_new_hwm_items = []
for old_hwm in raw_old_hwm_items:
    new_hwm = {"name": qualified_name, "modified_time": old_hwm["modified_time"]}

    if "column" in old_hwm:
        new_hwm["expression"] = old_hwm["column"]["name"]
    new_hwm["entity"] = old_hwm["source"]["name"]
    old_hwm.pop("process", None)

    if old_hwm["type"] == "int":
        new_hwm["type"] = "column_int"
        new_hwm["value"] = old_hwm["value"]

    elif old_hwm["type"] == "date":
        new_hwm["type"] = "column_date"
        new_hwm["value"] = old_hwm["value"]

    elif old_hwm["type"] == "datetime":
        new_hwm["type"] = "column_datetime"
        new_hwm["value"] = old_hwm["value"]

    elif old_hwm["type"] == "file_list":
        new_hwm["type"] = "file_list"
        new_hwm["value"] = [
            os.fspath(PurePosixPath(old_hwm["source"]["name"]).joinpath(path))
            for path in old_hwm["value"]
        ]

    else:
        raise ValueError("WAT?")

    raw_new_hwm_items.append(new_hwm)


print(raw_new_hwm_items)
# for column HWM
# [
#   {
#     "name": "col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost",
#     "modified_time": "2023-12-18T10:39:47.377378",
#     "expression": "col1",
#     "source": "schema.table",
#     "type": "column_int",
#     "value": 123,
#   },
# ]

# for file HWM
# [
#   {
#     "name": "file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost",
#     "modified_time": "2023-12-18T11:15:36.478462",
#     "entity": "/absolute/path",
#     "type": "file_list",
#     "value": ["/absolute/path/file1.txt", "/absolute/path/file2.txt"],
#   },
# ]


# Save file with new content
with open(hwm_path, "w") as file:
    dump(raw_new_hwm_items, file)


# Stop Python interpreter and update onETL
# pip install onetl==0.10.0
# Check that new .yml file can be read

from onetl.hwm.store import YAMLHWMStore

qualified_name = ...

# here you should pass the same arguments as used on production, if any
yaml_hwm_store = YAMLHWMStore()
yaml_hwm_store.get_hwm(qualified_name)

# for column HWM
# ColumnIntHWM(
#     name='col1#schema.table@postgres://host:5432/db#cde.abc.myprocess@myhost',
#     description='',
#     entity='schema.table',
#     value=123,
#     expression='col1',
#     modified_time=datetime.datetime(2023, 12, 18, 10, 39, 47, 377378),
# )

# for file HWM
# FileListHWM(
#     name='file_list#/absolute/path@ftp://ftp.server:21#cde.abc.myprocess@myhost',
#     description='',
#     entity=AbsolutePath('/absolute/path'),
#     value=frozenset({AbsolutePath('/absolute/path/file1.txt'), AbsolutePath('/absolute/path/file2.txt')}),
#     expression=None,
#     modified_time=datetime.datetime(2023, 12, 18, 11, 15, 36, 478462)
# )


# That's all!

Большинство пользователей используют другие реализации хранилища HWM, для которых указанная процедура не требуется.

  • Некоторые классы и функции были перемещены из onetl в etl_entities:
    from onetl.hwm.store import (
        detect_hwm_store,
        BaseHWMStore,
        HWMStoreClassRegistry,
        register_hwm_store_class,
        HWMStoreManager,
        MemoryHWMStore,
    )
    from etl_entities.hwm_store import (
        detect_hwm_store,
        BaseHWMStore,
        HWMStoreClassRegistry,
        register_hwm_store_class,
        HWMStoreManager,
        MemoryHWMStore,
    )

Их все еще можно импортировать из старого модуля, но они устарели и будут удалены в выпуске v1.0.0.

  • Изменился способ передачи HWM классам DBReader и FileDownloader:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Simple
reader = DBReader(
    connection=...,
    source=...,
    hwm_column="col1",
)





# Complex
reader = DBReader(
    connection=...,
    source=...,
    hwm_column=(
        "col1",
        "cast(col1 as date)",
    ),
)


# Files
downloader = FileDownloader(
    connection=...,
    source_path=...,
    target_path=...,
    hwm_type="file_list",
)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Simple
reader = DBReader(
    connection=...,
    source=...,
    hwm=DBReader.AutoDetectHWM(
        # name is mandatory now!
        name="my_unique_hwm_name",
        expression="col1",
    ),
)

# Complex
reader = DBReader(
    connection=...,
    source=...,
    hwm=DBReader.AutoDetectHWM(
        # name is mandatory now!
        name="my_unique_hwm_name",
        expression="cast(col1 as date)",
    ),
)

# Files
downloader = FileDownloader(
    connection=...,
    source_path=...,
    target_path=...,
    hwm=FileListHWM(
        # name is mandatory now!
        name="another_unique_hwm_name",
    ),
)

Новые классы HWM имеют обязательный атрибут name, который должен быть передан явно, вместо автоматической генерации.

Автоматическая генерация name с использованием старого синтаксиса DBReader.hwm_column / FileDownloader.hwm_type все еще поддерживается, но будет удалена в выпуске v1.0.0. (#179)

  • Значительно улучшена производительность стратегий чтения Incremental и Batch. (#182).
Подробнее о том, что было до и после

DBReader.run() + поведение инкрементной/пакетной стратегии в версиях 0.9.x и старше:

  • Получить схему таблицы, выполнив запрос SELECT * FROM table WHERE 1=0 (если DBReader.columns содержит *)
  • Развернуть * в реальные имена столбцов из таблицы, добавить сюда hwm_column, удалить дубликаты (поскольку некоторые СУБД этого не позволяют).
  • Создать dataframe из запроса, подобного SELECT hwm_expression AS hwm_column, ...other table columns... FROM table WHERE hwm_expression > prev_hwm.value.
  • Определить класс HWM, используя схему dataframe: df.schema[hwm_column].dataType.
  • Определить значение столбца x HWM, используя Spark: df.select(max(hwm_column)).collect().
  • Использовать max(hwm_column) в качестве следующего значения HWM и сохранить его в HWM Store.
  • Вернуть dataframe пользователю.

Это было далеко от идеала:

  • Содержимое Dataframe (все строки или только измененные) загружалось из источника в Spark только для получения минимальных/максимальных значений определенного столбца.

  • Шаг получения схемы таблицы, а затем подстановки имен столбцов в следующем запросе вызывал некоторые неожиданные ошибки.

    Например, если источник содержит столбцы со смешанным регистром имен, например "CamelColumn" или "spaced column".

    Имена столбцов не экранировались во время генерации запроса, что приводило к запросам, которые не могли быть выполнены базой данных.

    Поэтому пользователи должны были явно передавать имена столбцов DBReader, заключая столбцы со смешанным регистром именования в ":

    reader = DBReader(
        connection=...,
        source=...,
        columns=[  # passing '*' here leads to wrong SQL query generation
            "normal_column",
            '"CamelColumn"',
            '"spaced column"',
            ...,
        ],
    )
    
  • Использование DBReader с IncrementalStrategy могло привести к чтению строк, уже прочитанных ранее.

    Dataframe был создан из запроса с предложением WHERE, подобным hwm.expression > prev_hwm.value, а не hwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value.

    Поэтому, если в источнике появились новые строки после определения значения HWM, они могли быть прочитаны при доступе к содержимому dataframe (поскольку Spark использует "ленивые" вычисления), и это могло привести к несоответствиям между значением HWM и содержимого dataframe.

    Что впоследствии могло привести к проблемам, в случае если DBReader.run() прочитает некоторые данные, обновит значение HWM, а следующий вызов DBReader.run() прочитает строки, которые уже были прочитаны в предыдущем запуске.

DBReader.run() + поведение инкрементной/пакетной стратегии в версиях 0.10.x и новее:

  • Определить тип выражения HWM: SELECT hwm.expression FROM table WHERE 1=0.
  • Определить соответствующий тип Spark df.schema[0] и затем определить соответствующий класс HWM (если используется DBReader.AutoDetectHWM).
  • Получить минимальные/максимальные значения, запросив источник: SELECT MAX(hwm.expression) FROM table WHERE hwm.expression >= prev_hwm.value.
  • Использовать max(hwm.expression) в качестве следующего значения HWM и сохранить его в HWM Store.
  • Создать dataframe из запроса SELECT ... table columns ... FROM table WHERE hwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value, внедрив новое значение HWM в запрос.
  • Вернуть dataframe пользователю.

Улучшения:

  • Разрешение источнику вычислять min/max вместо загрузки всего набора данных в Spark. Это должно быть быстрее на больших объемах данных (до x2), потому что мы не передаем все данные из источника в Spark. Это может быть даже еще быстрее, если источник имеет индексы для столбца HWM.
  • Список столбцов передается в источник как есть, без какой-либо обработки на стороне DBReader. Таким образом, вы можете передать DBReader(columns=["*"]) для чтения таблиц со смешанным именованием столбцов.
  • Ограничение содержимого dataframe, чтобы оно всегда соответствовало значениям HWM, гарантирующее, что одна и та же строка никогда не будет прочитана дважды.

Критическое изменение: Столбец HWM больше не добавляется неявно в dataframe. Раньше он был частью предложения SELECT, но теперь он упоминается только в предложении WHERE.

Поэтому, если у вас был код, подобный этому, вам придется его переписать:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
reader = DBReader(
    connection=...,
    source=...,
    columns=[
        "col1",
        "col2",
    ],
    hwm_column="hwm_col",
)

df = reader.run()
# hwm_column value is in the dataframe
assert df.columns == ["col1", "col2", "hwm_col"]




reader = DBReader(
    connection=...,
    source=...,
    columns=[
        "col1",
        "col2",
    ],
    hwm_column=(
        "hwm_col",
        "cast(hwm_col as int)",
    ),
)

df = reader.run()
# hwm_expression value is in the dataframe
assert df.columns == ["col1", "col2", "hwm_col"]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
reader = DBReader(
    connection=...,
    source=...,
    columns=[
        "col1",
        "col2",
        # add hwm_column explicitly
        "hwm_col",
    ],
    hwm_column="hwm_col",
)

df = reader.run()
# if columns list is not updated,
# this fill fail
assert df.columns == ["col1", "col2", "hwm_col"]

reader = DBReader(
    connection=...,
    source=...,
    columns=[
        "col1",
        "col2",
        # add hwm_expression explicitly
        "cast(hwm_col as int) as hwm_col",
    ],
    hwm_column=(
        "hwm_col",
        "cast(hwm_col as int)",
    ),
)
df = reader.run()
# if columns list is not updated,
# this fill fail
assert df.columns == ["col1", "col2", "hwm_col"]

Но поскольку большинство пользователей все равно используют columns=["*"], они не увидят никаких изменений.

  • FileDownloader.run() теперь обновляет HWM в HWM Store после обработки всех файлов, а не после загрузки каждого отдельного файла.

Причины изменений:

  • FileDownloader можно использовать с DownloadOptions(workers=N), что может привести к состоянию гонки - один поток может сохранить в HWM store одно значение HWM, а другой -- другое.
  • FileDownloader может загружать сотни и даже тысячи файлов, и отправка запроса в HWM Store для каждого файла может потенциально привести к DDoS HWM Store. (#189)

Обработчик исключений пытается сохранить HWM в HWM store, если процесс загрузки был прерван. Но если он был прерван принудительно, например, отправкой события SIGKILL, HWM не будет сохранен в HWM store, и тогда некоторые уже загруженные файлы могут быть загружены снова.

Кроме того, неожиданное завершение процесса может привести к другим негативным последствиям, например, некоторые файлы будут загружены частично, поэтому мы считаем такую реализацию ожидаемым поведением.

Функциональность

  • Добавлена совместимость с Python 3.12. (#167)
  • Формат файлов Excel теперь можно использовать с Spark 3.5.0. (#187)
  • SnapshotBatchStagy и IncrementalBatchStrategy не вызывают исключений, если источник не содержит данных. Вместо этого они останавливаются на первой итерации и возвращают пустой dataframe. (#188)
  • Добавлено кэширование результата connection.check() в классах высокого уровня, таких как DBReader, FileDownloader и т. д. Это устраняет избыточную информативность логов. (#190)

Исправление ошибок

  • Исправлены декораторы @slot и @hook, возвращающие методы с отсутствующими аргументами в сигнатуре (Pylance, VS Code). (#183)
  • В документации Kafka connector говорилось, что он поддерживает инкрементное чтение данных топика путем передачи group.id или groupIdPrefix. На самом деле это не так, потому что Spark не отправляет в Kafka информацию о том, какие сообщения были получены. Поэтому в настоящее время пользователи могут читать весь топик целиком, а инкрементное чтение не поддерживается.