0.11.0 (2024-05-27)
Критические изменения
Могут быть некоторые изменения в поведении подключения, связанные с обновлениями версий. Поэтому мы отмечаем такие изменения как breaking, хотя большинство пользователей не увидят никаких различий.
- Обновление JDBC драйвера Clickhouse до последней версии (#249):
- Пакет был переименован
ru.yandex.clickhouse:clickhouse-jdbc→com.clickhouse:clickhouse-jdbc. - Версия пакета изменена
0.3.2→0.6.0-patch5. - Имя драйвера изменено
ru.yandex.clickhouse.ClickHouseDriver→com.clickhouse.jdbc.ClickHouseDriver.
Это приводит к необходимости нескольких изменений для совместимости типов Spark <-> Clickhouse, а также поддержки кластеров Clickhouse.
- Обновление других JDBC драйверов до последних версий:
- MSSQL
12.2.0→12.6.2(#254). - MySQL
8.0.33→8.4.0(#253, #285). - Oracle
23.2.0.0→23.4.0.24.05(#252, #284). -
Postgres
42.6.0→42.7.3(#251). -
Обновление коннектора MongoDB до последней версии:
10.1.1→10.3.0(#255, #283).
Благодаря которому появляется поддержка Spark 3.5.
- Обновление пакета
XMLдо последней версии:0.17.0→0.18.0(#259).
Которое устраняет несколько ошибок обработки формата datetime.
- Для JDBC соединений добавлен новый класс
SQLOptionsдля методаDB.sql(query, options=...)(#272).
Благодаря этому:
Во-первых, лучше поддерживается согласованность именования.
Во-вторых, некоторые опции не поддерживаются методом DB.sql(...), но поддерживаются DBReader.
Например, SQLOptions не поддерживает partitioning_mode и требует явного определения lower_bound и upper_bound, когда num_partitions больше 1.
ReadOptions поддерживает partitioning_mode и позволяет пропускать значения lower_bound и upper_bound.
Это требует некоторых изменений в коде. Раньше:
from onetl.connection import Postgres
postgres = Postgres(...)
df = postgres.sql(
"""
SELECT *
FROM some.mytable
WHERE key = 'something'
""",
options=Postgres.ReadOptions(
partitioning_mode="range",
partition_column="id",
num_partitions=10,
),
)
Теперь:
from onetl.connection import Postgres
postgres = Postgres(...)
df = postgres.sql(
"""
SELECT *
FROM some.mytable
WHERE key = 'something'
""",
options=Postgres.SQLOptions(
# partitioning_mode is not supported!
partition_column="id",
num_partitions=10,
lower_bound=0, # <-- set explicitly
upper_bound=1000, # <-- set explicitly
),
)
На данный момент DB.sql(query, options=...) может принимать ReadOptions для сохранения обратной совместимости, но выдает предупреждение об устаревании.
Поддержка будет удалена в v1.0.0.
- Разделение класса
JDBCOptionsнаFetchOptionsиExecuteOptions(#274).
Новые классы используются методами DB.fetch(query, options=...) и DB.execute(query, options=...) соответственно.
Это изменение необходимо в основном для большей консистентности именования.
Оно также требует некоторых изменений в коде. Раньше:
from onetl.connection import Postgres
postgres = Postgres(...)
df = postgres.fetch(
"SELECT * FROM some.mytable WHERE key = 'something'",
options=Postgres.JDBCOptions(
fetchsize=1000,
query_timeout=30,
),
)
postgres.execute(
"UPDATE some.mytable SET value = 'new' WHERE key = 'something'",
options=Postgres.JDBCOptions(query_timeout=30),
)
Теперь:
from onetl.connection import Postgres
# Using FetchOptions for fetching data
postgres = Postgres(...)
df = postgres.fetch(
"SELECT * FROM some.mytable WHERE key = 'something'",
options=Postgres.FetchOptions( # <-- change class name
fetchsize=1000,
query_timeout=30,
),
)
# Using ExecuteOptions for executing statements
postgres.execute(
"UPDATE some.mytable SET value = 'new' WHERE key = 'something'",
options=Postgres.ExecuteOptions(query_timeout=30), # <-- change class name
)
На данный момент DB.fetch(query, options=...) и DB.execute(query, options=...) могут принимать JDBCOptions для сохранения обратной совместимости, но выдают предупреждение об устаревании. Старый класс будет удален в v1.0.0.
- Сериализация
ColumnDatetimeHWMв типDateTime64(6)Clickhouse (точность до микросекунд) вместоDateTime(точность до секунд) (#267).
В предыдущих версиях onETL значение ColumnDatetimeHWM округлялось до секунды, и, таким образом, чтение некоторых строк, которые были прочитаны раньше, приводило к дублированию данных.
Для версий Clickhouse ниже 21.1 сравнение столбца типа DateTime со значением типа DateTime64 не поддерживается, возвращая пустой dataframe.
Чтобы избежать этого, замените:
DBReader(
...,
hwm=DBReader.AutoDetectHWM(
name="my_hwm",
expression="hwm_column", # <--
),
)
на:
DBReader(
...,
hwm=DBReader.AutoDetectHWM(
name="my_hwm",
expression="CAST(hwm_column AS DateTime64)", # <-- add explicit CAST
),
)
- Передача дополнительных параметров подключения JDBC как словаря
propertiesвместо URL с "query part" (#268).
Это позволяет передавать пользовательские параметры соединения, такие как Clickhouse(extra={"custom_http_options": "option1=value1,option2=value2"}) без необходимости применять urlencode к значению параметра, например option1%3Dvalue1%2Coption2%3Dvalue2.
Функциональность
Улучшение пользовательского опыта при работе с сообщениями Kafka и таблицами баз данных, содержащими сериализованные столбцы, такие как JSON или XML.
- Разрешена передача пользовательской версии пакета в качестве аргумента для метода
DB.get_packages(...)нескольких коннекторах к БД: Clickhouse.get_packages(package_version=..., apache_http_client_version=...)(#249).MongoDB.get_packages(scala_version=..., spark_version=..., package_version=...)(#255).MySQL.get_packages(package_version=...)(#253).MSSQL.get_packages(java_version=..., package_version=...)(#254).Oracle.get_packages(java_version=..., package_version=...)(#252).Postgres.get_packages(package_version=...)(#251).Teradata.get_packages(package_version=...)(#256). Теперь пользователи могут понижать или повышать версию подключения, не дожидаясь следующего релиза onETL. Раньше такую возможность поддерживали толькоKafkaиGreenplum.- Добавлен метод
FileFormat.parse_column(...)в несколько классов: Avro.parse_column(col)(#265).JSON.parse_column(col, schema=...)(#257).CSV.parse_column(col, schema=...)(#258).XML.parse_column(col, schema=...)(#269). Это позволяет анализировать данные в полеvalueсообщения Kafka или строковый/двоичный столбец некоторой таблицы как вложенную структуру Spark.- Добавлен метод
FileFormat.serialize_column(...)в несколько классов: Avro.serialize_column(col)(#265).JSON.serialize_column(col)(#257).CSV.serialize_column(col)(#258). Это позволяет сохранять вложенные структуры Spark или массивы в полеvalueсообщения Kafka или строковый/двоичный столбец некоторой таблицы.
Улучшения
Несколько улучшений документации.
assertзаменены синтаксисом doctest. Это должно сделать документацию более читаемой (#273).- Добавлено общее руководство
Troubleshooting(#275). - Улучшена документация Kafka:
- Добавлена страница "Prerequisites", описывающая различные аспекты подключения к Kafka.
- Дополнительные примеры и примечания добавлены на страницы "Reading from" и "Writing to" документации Kafka.
- Добавлена страница "Troubleshooting" (#276).
- Улучшена документация Hive:
- Добавлена страница "Prerequisites", описывающая различные аспекты подключения к Hive.
- Дополнительные примеры и примечания добавлены на страницы "Reading from" и "Writing to" документации Hive.
- Страница "Executing statements in Hive" документации Hive стала более содержательной. (#278).
- Добавлена страница "Prerequisites", описывающая различные аспекты использования коннекторов SparkHDFS и SparkS3. (#279).
- Добавлены примечания о подключении к кластеру Clickhouse. (#280).
- Добавлены примечания о версиях, когда был добавлен, переименован или изменен класс/метод/атрибут/аргумент (#282).
Исправление ошибок
- После установки
pip install onetl[files]ранее отсутствовал пакетpysmb. Теперь этот недостаток устранен.