Skip to content

0.13.0 (2025-02-24)

🎉 3 года с первого релиза 0.1.0 🎉

Критические изменения

  • Добавлена поддержка Python 3.13. (#298)

  • Изменена логика FileConnection.walk и FileConnection.list_dir. (#327)

Ранее limits.stops_at(path) == True выполнялось как "вернуть текущий файл и остановиться", и могло привести к превышению некоторого лимита. Теперь это означает "остановиться немедленно".

  • Изменено значение по умолчанию для FileDFWriter.Options(if_exists=...) с error на append, чтобы соответствовать другим классам .Options() внутри onETL. (#343)

Функциональность

  • Добавлена поддержка для класса HWM FileModifiedTimeHWM (см. etl-entities 2.5.0):
from etl_entitites.hwm import FileModifiedTimeHWM
from onetl.file import FileDownloader
from onetl.strategy import IncrementalStrategy

downloader = FileDownloader(
    ...,
    hwm=FileModifiedTimeHWM(name="somename"),
)

with IncrementalStrategy():
    downloader.run()
  • Добавлен класс фильтра FileSizeRange(min=..., max=...). (#325)

Теперь пользователи могут настроить FileDownloader / FileMover для скачивания/перемещения только файлов с определенным диапазоном их размеров:

from onetl.file import FileDownloader
from onetl.file.filter import FileSizeRange

downloader = FileDownloader(
    ...,
    filters=[FileSizeRange(min="10KiB", max="1GiB")],
)
  • Добавлен класс лимита TotalFilesSize(...). (#326)

Теперь пользователи могут настроить FileDownloader / FileMover для остановки скачивания/перемещения файлов после достижения определенного объема обработанных данных:

from datetime import datetime, timedelta
from onetl.file import FileDownloader
from onetl.file.limit import TotalFilesSize

downloader = FileDownloader(
    ...,
    limits=[TotalFilesSize("1GiB")],
)
  • Реализован фильтр файлов FileModifiedTime(since=..., until=...). (#330)

Теперь пользователи могут настроить FileDownloader / FileMover для скачивания/перемещения только файлов с определенным временем изменения:

from datetime import datetime, timedelta
from onetl.file import FileDownloader
from onetl.file.filter import FileModifiedTime

downloader = FileDownloader(
    ...,
    filters=[FileModifiedTime(before=datetime.now() - timedelta(hours=1))],
)
  • Добавлены методы SparkS3.get_exclude_packages() и Kafka.get_exclude_packages(). (#341)

Их использование позволяет пропустить скачивание зависимостей, не требующихся этим конкретным коннекторам, или таких, которые уже являются частью Spark/PySpark:

from onetl.connection import SparkS3, Kafka

maven_packages = [
    *SparkS3.get_packages(spark_version="3.5.4"),
    *Kafka.get_packages(spark_version="3.5.4"),
]
exclude_packages = SparkS3.get_exclude_packages() + Kafka.get_exclude_packages()
spark = (
    SparkSession.builder.appName("spark_app_onetl_demo")
    .config("spark.jars.packages", ",".join(maven_packages))
    .config("spark.jars.excludes", ",".join(exclude_packages))
    .getOrCreate()
)

Улучшения

  • Все подключения к БД, открытые JDBC.fetch(...), JDBC.execute(...) или JDBC.check(), немедленно закрываются после выполнения операторов. (#334)

Ранее Spark сессия с master=local[3] фактически открывала до 5 соединений: одно для JDBC.check(), другое для взаимодействия драйвера Spark с БД для создания таблиц, и по одному для каждого Spark executor. Теперь открывается максимум 4 соединения, так как JDBC.check() не удерживает открытое соединение.

Это важно для СУБД, таких как Postgres или Greenplum, где количество соединений строго ограничено, и лимит обычно довольно низкий.

  • Настроен ApplicationName (client info) для Clickhouse, MongoDB, MSSQL, MySQL и Oracle. (#339, #248)

Обновлен формат ApplicationName для Greenplum, Postgres, Kafka и SparkS3. Теперь все коннекторы имеют одинаковый формат ApplicationName: ${spark.applicationId} ${spark.appName} onETL/${onetl.version} Spark/${spark.version}

Единственные подключения, не отправляющие ApplicationName, - это реализации Teradata и FileConnection.

  • Теперь DB.check() будет проверять доступность соединения не только на драйвере Spark, но и на одном из экзекуторов Spark. (#346)

Это позволяет немедленно завершиться с ошибкой, если хост драйвера Spark имеет сетевой доступ к целевой БД, а экзекутор Spark - нет.

Note

Теперь Greenplum.check() требует тех же прав пользователя, что и DBReader(connection=greenplum):

-- yes, "writable" for reading data from GP, it's not a mistake
ALTER USER username CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');

-- for both reading and writing to GP
-- ALTER USER username CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist') CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');

Пожалуйста, попросите ваших администраторов Greenplum предоставить эти права.

Исправление ошибок

  • Предотвращено подавление ошибок Hive Metastore при использовании DBWriter. (#329)

Ранее это было реализовано как:

try:
    spark.sql(f"SELECT * FROM {table}")
    table_exists = True
except Exception:
    table_exists = False

Если Hive Metastore был перегружен и отвечал исключением, считалось, что таблица не существует и это приводило к ее полному переопределению, вместо добавления или переопределения только подмножества разделов.

  • Исправлено использование onETL для записи данных в экземпляры PostgreSQL или Greenplum, использующих pgbouncer с pool_mode=transaction. (#336)

Ранее Postgres.check() открывал транзакцию только для чтения, pgbouncer изменял весь тип соединения с "чтения-записи" на "только для чтения", и когда DBWriter.run(df) выполнялся в соединении только для чтения, возникали ошибки, такие как:

org.postgresql.util.PSQLException: ERROR: cannot execute INSERT in a read-only transaction
org.postgresql.util.PSQLException: ERROR: cannot execute TRUNCATE TABLE in a read-only transaction

Добавлено обходное решение путем передачи readOnly=True в параметры JDBC для соединений только для чтения, чтобы pgbouncer мог правильно различать соединения "только для чтения" и "чтения-записи".

После обновления onETL 0.13.x или выше та же ошибка все еще может появляться, если pgbouncer все еще удерживает соединения "только для чтения" и возвращает их для DBWriter. Чтобы это исправить, пользователь может вручную преобразовать соединение "только для чтения" в соединение "чтения-записи":

postgres.execute("BEGIN READ WRITE;")  # <-- add this line
DBWriter(...).run()

После того, как все соединения в пуле pgbouncer были преобразованы из "только для чтения" в "чтение-запись", и ошибка исправлена, эту дополнительную строку можно удалить.

См. Документацию драйвера Postgres JDBC.

  • Исправлены MSSQL.fetch(...) и MySQL.fetch(...), открывавшие соединение "чтения-записи" вместо соединения "только для чтения". (#337)

Теперь:

  • MSSQL.fetch(...) устанавливает соединение с ApplicationIntent=ReadOnly.
  • MySQL.fetch(...) вызывает оператор SET SESSION TRANSACTION READ ONLY.

  • Исправлена передача нескольких фильтров в FileDownloader и FileMover. (#338) Необходимость этого была вызвана сортировкой списка фильтров во внутреннем методе ведения журнала, но подклассы FileFilter не поддерживают сортировку.

  • Исправлено ложное предупреждение о большом количестве параллельных соединений с Grenplum. (#342)

Создание сессии Spark с .master("local[5]") может открыть до 6 соединений с Greenplum (количество экзекуторов Spark + 1 для драйвера), но onETL вместо этого использовала количество ядер CPU на хосте в качестве количества параллельных соединений.

Это приводило к отображению ложного предупреждения о том, что количество соединений Greenplum слишком велико, что на самом деле должно быть только в том случае, если количество экзекуторов превышает 30.

  • Исправлена ошибка, из-за которой подключение к MongoDB пыталось использовать имя указанной базы данных в качестве authSource. (#347)

Теперь используется значение коннектора по умолчанию, т.е. admin. Предыдущие версии onETL можно исправить следующим образом:

from onetl.connection import MongoDB

mongodb = MongoDB(
    ...,
    database="mydb",
    extra={
        "authSource": "admin",
    },
)

Зависимости

  • Минимальная версия etl-entities теперь 2.5.0. (#331)
  • До последних версий обновлены коннекторы/драйверы БД: (#345)
  • Clickhouse 0.6.50.7.2
  • MongoDB 10.4.010.4.1
  • MySQL 9.0.09.2.0
  • Oracle 23.5.0.24.0723.7.0.25.01
  • Postgres 42.7.442.7.5

Изменения только в документации

  • Большие примеры кода разделены на вкладки. (#344)