Чтение данных из Iceberg с помощью DBReader
[DBReader][DB-onetl-db-reader] поддерживает [strategy][DB-onetl-strategy-read-strategies] Используется для инкрементального чтения данных, но не поддерживает пользовательские запросы, такие как JOIN.
Поддерживаемые функции DBReader
- ✅︎
columns - ✅︎
where - ✅︎
hwm, поддерживаемые стратегии: - ✅︎ [
snapshot-strategy][DB-onetl-strategy-snapshot-strategy] - ✅︎ [
incremental-strategy][DB-onetl-connection-db-connection-clickhouse-read-incremental-strategy] - ✅︎ [
snapshot-batch-strategy][DBRonetl-strategy-snapshot-batch-strategy] - ✅︎ [
incremental-batch-strategy][DB-onetl-strategy-incremental-batch-strategy] - ✅︎
hint - ❌
df_schema - ❌
options(Используются только параметры конфигурации Spark.)
Warning
Значения columns, where и hwm.expression следует записывать с использованием синтаксиса SparkSQL.
Примеры
Стратегия создания моментальных снимков:
from onetl.connection import Iceberg
from onetl.db import DBReader
iceberg = Iceberg(catalog_name="my_catalog", ...)
reader = DBReader(
connection=iceberg,
source="my_schema.table", # catalog is already defined in connection
columns=["id", "key", "value", "updated_dt"],
where="key = 'something'",
)
df = reader.run()
Стратегия поэтапного внедрения:
from onetl.connection import Iceberg
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy
iceberg = Iceberg(catalog_name="my_catalog", ...)
reader = DBReader(
connection=iceberg,
source="my_schema.table", # catalog is already defined in connection
columns=["id", "key", "value", "updated_dt"],
where="key = 'something'",
hwm=DBReader.AutoDetectHWM(name="iceberg_hwm", expression="updated_dt"),
)
with IncrementalStrategy():
df = reader.run()
Рекомендации
Выберите только необходимые столбцы
Вместо передачи "*" в DBReader(columns=[...]) лучше передавать точные имена столбцов. Это значительно уменьшает объем данных, считываемых Spark.