0.13.0 (2025-02-24)
🎉 3 года с первого релиза 0.1.0 🎉
Критические изменения
-
Добавлена поддержка Python 3.13. (#298)
-
Изменена логика
FileConnection.walkиFileConnection.list_dir. (#327)
Ранее limits.stops_at(path) == True выполнялось как "вернуть текущий файл и остановиться", и могло привести к превышению некоторого лимита. Теперь это означает "остановиться немедленно".
- Изменено значение по умолчанию для
FileDFWriter.Options(if_exists=...)сerrorнаappend, чтобы соответствовать другим классам.Options()внутри onETL. (#343)
Функциональность
- Добавлена поддержка для класса HWM
FileModifiedTimeHWM(см. etl-entities 2.5.0):
from etl_entitites.hwm import FileModifiedTimeHWM
from onetl.file import FileDownloader
from onetl.strategy import IncrementalStrategy
downloader = FileDownloader(
...,
hwm=FileModifiedTimeHWM(name="somename"),
)
with IncrementalStrategy():
downloader.run()
- Добавлен класс фильтра
FileSizeRange(min=..., max=...). (#325)
Теперь пользователи могут настроить FileDownloader / FileMover для скачивания/перемещения только файлов с определенным диапазоном их размеров:
from onetl.file import FileDownloader
from onetl.file.filter import FileSizeRange
downloader = FileDownloader(
...,
filters=[FileSizeRange(min="10KiB", max="1GiB")],
)
- Добавлен класс лимита
TotalFilesSize(...). (#326)
Теперь пользователи могут настроить FileDownloader / FileMover для остановки скачивания/перемещения файлов после достижения определенного объема обработанных данных:
from datetime import datetime, timedelta
from onetl.file import FileDownloader
from onetl.file.limit import TotalFilesSize
downloader = FileDownloader(
...,
limits=[TotalFilesSize("1GiB")],
)
- Реализован фильтр файлов
FileModifiedTime(since=..., until=...). (#330)
Теперь пользователи могут настроить FileDownloader / FileMover для скачивания/перемещения только файлов с определенным временем изменения:
from datetime import datetime, timedelta
from onetl.file import FileDownloader
from onetl.file.filter import FileModifiedTime
downloader = FileDownloader(
...,
filters=[FileModifiedTime(before=datetime.now() - timedelta(hours=1))],
)
- Добавлены методы
SparkS3.get_exclude_packages()иKafka.get_exclude_packages(). (#341)
Их использование позволяет пропустить скачивание зависимостей, не требующихся этим конкретным коннекторам, или таких, которые уже являются частью Spark/PySpark:
from onetl.connection import SparkS3, Kafka
maven_packages = [
*SparkS3.get_packages(spark_version="3.5.4"),
*Kafka.get_packages(spark_version="3.5.4"),
]
exclude_packages = SparkS3.get_exclude_packages() + Kafka.get_exclude_packages()
spark = (
SparkSession.builder.appName("spark_app_onetl_demo")
.config("spark.jars.packages", ",".join(maven_packages))
.config("spark.jars.excludes", ",".join(exclude_packages))
.getOrCreate()
)
Улучшения
- Все подключения к БД, открытые
JDBC.fetch(...),JDBC.execute(...)илиJDBC.check(), немедленно закрываются после выполнения операторов. (#334)
Ранее Spark сессия с master=local[3] фактически открывала до 5 соединений: одно для JDBC.check(), другое для взаимодействия драйвера Spark с БД для создания таблиц, и по одному для каждого Spark executor. Теперь открывается максимум 4 соединения, так как JDBC.check() не удерживает открытое соединение.
Это важно для СУБД, таких как Postgres или Greenplum, где количество соединений строго ограничено, и лимит обычно довольно низкий.
Обновлен формат ApplicationName для Greenplum, Postgres, Kafka и SparkS3.
Теперь все коннекторы имеют одинаковый формат ApplicationName: ${spark.applicationId} ${spark.appName} onETL/${onetl.version} Spark/${spark.version}
Единственные подключения, не отправляющие ApplicationName, - это реализации Teradata и FileConnection.
- Теперь
DB.check()будет проверять доступность соединения не только на драйвере Spark, но и на одном из экзекуторов Spark. (#346)
Это позволяет немедленно завершиться с ошибкой, если хост драйвера Spark имеет сетевой доступ к целевой БД, а экзекутор Spark - нет.
Note
Теперь Greenplum.check() требует тех же прав пользователя, что и DBReader(connection=greenplum):
-- yes, "writable" for reading data from GP, it's not a mistake
ALTER USER username CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');
-- for both reading and writing to GP
-- ALTER USER username CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist') CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');
Пожалуйста, попросите ваших администраторов Greenplum предоставить эти права.
Исправление ошибок
- Предотвращено подавление ошибок Hive Metastore при использовании
DBWriter. (#329)
Ранее это было реализовано как:
try:
spark.sql(f"SELECT * FROM {table}")
table_exists = True
except Exception:
table_exists = False
Если Hive Metastore был перегружен и отвечал исключением, считалось, что таблица не существует и это приводило к ее полному переопределению, вместо добавления или переопределения только подмножества разделов.
- Исправлено использование onETL для записи данных в экземпляры PostgreSQL или Greenplum, использующих pgbouncer с
pool_mode=transaction. (#336)
Ранее Postgres.check() открывал транзакцию только для чтения, pgbouncer изменял весь тип соединения с "чтения-записи" на "только для чтения", и когда DBWriter.run(df) выполнялся в соединении только для чтения, возникали ошибки, такие как:
org.postgresql.util.PSQLException: ERROR: cannot execute INSERT in a read-only transaction
org.postgresql.util.PSQLException: ERROR: cannot execute TRUNCATE TABLE in a read-only transaction
Добавлено обходное решение путем передачи readOnly=True в параметры JDBC для соединений только для чтения, чтобы pgbouncer мог правильно различать соединения "только для чтения" и "чтения-записи".
После обновления onETL 0.13.x или выше та же ошибка все еще может появляться, если pgbouncer все еще удерживает соединения "только для чтения" и возвращает их для DBWriter. Чтобы это исправить, пользователь может вручную преобразовать соединение "только для чтения" в соединение "чтения-записи":
postgres.execute("BEGIN READ WRITE;") # <-- add this line
DBWriter(...).run()
После того, как все соединения в пуле pgbouncer были преобразованы из "только для чтения" в "чтение-запись", и ошибка исправлена, эту дополнительную строку можно удалить.
См. Документацию драйвера Postgres JDBC.
- Исправлены
MSSQL.fetch(...)иMySQL.fetch(...), открывавшие соединение "чтения-записи" вместо соединения "только для чтения". (#337)
Теперь:
MSSQL.fetch(...)устанавливает соединение сApplicationIntent=ReadOnly.-
MySQL.fetch(...)вызывает операторSET SESSION TRANSACTION READ ONLY. -
Исправлена передача нескольких фильтров в
FileDownloaderиFileMover. (#338) Необходимость этого была вызвана сортировкой списка фильтров во внутреннем методе ведения журнала, но подклассыFileFilterне поддерживают сортировку. -
Исправлено ложное предупреждение о большом количестве параллельных соединений с Grenplum. (#342)
Создание сессии Spark с .master("local[5]") может открыть до 6 соединений с Greenplum (количество экзекуторов Spark + 1 для драйвера), но onETL вместо этого использовала количество ядер CPU на хосте в качестве количества параллельных соединений.
Это приводило к отображению ложного предупреждения о том, что количество соединений Greenplum слишком велико, что на самом деле должно быть только в том случае, если количество экзекуторов превышает 30.
- Исправлена ошибка, из-за которой подключение к MongoDB пыталось использовать имя указанной базы данных в качестве
authSource. (#347)
Теперь используется значение коннектора по умолчанию, т.е. admin. Предыдущие версии onETL можно исправить следующим образом:
from onetl.connection import MongoDB
mongodb = MongoDB(
...,
database="mydb",
extra={
"authSource": "admin",
},
)
Зависимости
- Минимальная версия
etl-entitiesтеперь 2.5.0. (#331) - До последних версий обновлены коннекторы/драйверы БД: (#345)
- Clickhouse
0.6.5→0.7.2 - MongoDB
10.4.0→10.4.1 - MySQL
9.0.0→9.2.0 - Oracle
23.5.0.24.07→23.7.0.25.01 - Postgres
42.7.4→42.7.5
Изменения только в документации
- Большие примеры кода разделены на вкладки. (#344)