Skip to content

Чтение данных из Iceberg с помощью DBReader

[DBReader][DB-onetl-db-reader] поддерживает [strategy][DB-onetl-strategy-read-strategies] Используется для инкрементального чтения данных, но не поддерживает пользовательские запросы, такие как JOIN.

Поддерживаемые функции DBReader

  • ✅︎ columns
  • ✅︎ where
  • ✅︎ hwm, поддерживаемые стратегии:
  • ✅︎ [snapshot-strategy][DB-onetl-strategy-snapshot-strategy]
  • ✅︎ [incremental-strategy][DB-onetl-connection-db-connection-clickhouse-read-incremental-strategy]
  • ✅︎ [snapshot-batch-strategy][DBRonetl-strategy-snapshot-batch-strategy]
  • ✅︎ [incremental-batch-strategy][DB-onetl-strategy-incremental-batch-strategy]
  • ✅︎ hint
  • df_schema
  • options (Используются только параметры конфигурации Spark.)

Warning

Значения columns, where и hwm.expression следует записывать с использованием синтаксиса SparkSQL.

Примеры

Стратегия создания моментальных снимков:

from onetl.connection import Iceberg
from onetl.db import DBReader

iceberg = Iceberg(catalog_name="my_catalog", ...)

reader = DBReader(
    connection=iceberg,
    source="my_schema.table",  # catalog is already defined in connection
    columns=["id", "key", "value", "updated_dt"],
    where="key = 'something'",
)
df = reader.run()

Стратегия поэтапного внедрения:

from onetl.connection import Iceberg
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy

iceberg = Iceberg(catalog_name="my_catalog", ...)

reader = DBReader(
    connection=iceberg,
    source="my_schema.table",  # catalog is already defined in connection
    columns=["id", "key", "value", "updated_dt"],
    where="key = 'something'",
    hwm=DBReader.AutoDetectHWM(name="iceberg_hwm", expression="updated_dt"),
)

with IncrementalStrategy():
    df = reader.run()

Рекомендации

Выберите только необходимые столбцы

Вместо передачи "*" в DBReader(columns=[...]) лучше передавать точные имена столбцов. Это значительно уменьшает объем данных, считываемых Spark.