Skip to content

Чтение из Clickhouse с использованием DBReader

DBReader поддерживает стратегии для инкрементального чтения данных, но не поддерживает пользовательские запросы, например, включающие JOIN.

Warning

Пожалуйста, учитывайте типы данных Сlickhouse

Поддерживаемые функции DBReader

  • ✅︎ columns
  • ✅︎ where
  • ✅︎ hwm, поддерживаемые стратегии:
  • ✅︎ Snapshot
  • ✅︎ Incremental
  • ✅︎ Snapshot batch
  • ✅︎ Incremental batch
  • hint (не поддерживается Clickhouse)
  • df_schema
  • ✅︎ options (см. Clickhouse.ReadOptions)

Примеры

Стратегия Snapshot

from onetl.connection import Clickhouse
from onetl.db import DBReader

clickhouse = Clickhouse(...)

reader = DBReader(
    connection=clickhouse,
    source="schema.table",
    columns=["id", "key", "CAST(value AS String) value", "updated_dt"],
    where="key = 'something'",
    options=Clickhouse.ReadOptions(partitionColumn="id", numPartitions=10),
)
df = reader.run()

Стратегия Incremental

from onetl.connection import Clickhouse
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy

clickhouse = Clickhouse(...)

reader = DBReader(
    connection=clickhouse,
    source="schema.table",
    columns=["id", "key", "CAST(value AS String) value", "updated_dt"],
    where="key = 'something'",
    hwm=DBReader.AutoDetectHWM(name="clickhouse_hwm", expression="updated_dt"),
    options=Clickhouse.ReadOptions(partitionColumn="id", numPartitions=10),
)

with IncrementalStrategy():
    df = reader.run()

Рекомендации

Выбирайте только необходимые столбцы

Вместо передачи "*" в DBReader(columns=[...]) предпочтительнее передавать точные имена столбцов. Это уменьшает объем данных, передаваемых из Clickhouse в Spark.

Обратите внимание на значение where

Вместо фильтрации данных на стороне Spark с помощью df.filter(df.column == 'value') передавайте соответствующее условие DBReader(where="column = 'value'"). Это не только уменьшает объем данных, передаваемых из Clickhouse в Spark, но и может улучшить производительность запроса. Особенно если есть индексы или в условии where используются столбцы партиционирования.

Опции

onetl.connection.db_connection.clickhouse.options.ClickhouseReadOptions

Bases: JDBCReadOptions

Source code in onetl/connection/db_connection/clickhouse/options.py
15
16
class ClickhouseReadOptions(JDBCReadOptions):
    __doc__ = JDBCReadOptions.__doc__.replace("SomeDB", "Clickhouse")  # type: ignore[assignment, union-attr]

fetchsize = 100000 class-attribute instance-attribute

Fetch N rows from an opened cursor per one read round.

Tuning this option can influence performance of reading.

.. warning::

Default value is different from Spark.

Spark uses driver's own value, and it may be different in different drivers,
and even versions of the same driver. For example, Oracle has
default ``fetchsize=10``, which is absolutely not usable.

Thus we've overridden default value with ``100_000``, which should increase reading performance.

.. versionchanged:: 0.2.0 Set explicit default value to 100_000

lower_bound = Field(default=None, alias='lowerBound') class-attribute instance-attribute

See documentation for :obj:~partitioning_mode for more details

num_partitions = Field(default=1, alias='numPartitions') class-attribute instance-attribute

Number of jobs created by Spark to read the table content in parallel. See documentation for :obj:~partitioning_mode for more details

partition_column = Field(default=None, alias='partitionColumn') class-attribute instance-attribute

Column used to parallelize reading from a table.

.. warning:: It is highly recommended to use primary key, or column with an index to avoid performance issues.

.. note:: Column type depends on :obj:~partitioning_mode.

* ``partitioning_mode="range"`` requires column to be an integer, date or timestamp (can be NULL, but not recommended).
* ``partitioning_mode="hash"`` accepts any column type (NOT NULL).
* ``partitioning_mode="mod"`` requires column to be an integer (NOT NULL).

See documentation for :obj:~partitioning_mode for more details

partitioning_mode = JDBCPartitioningMode.RANGE class-attribute instance-attribute

Defines how Spark will parallelize reading from table.

Possible values:

  • range (default) Allocate each executor a range of values from column passed into :obj:~partition_column.

    .. dropdown:: Spark generates for each executor an SQL query

    Executor 1:
    
    .. code:: sql
    
        SELECT ... FROM table
        WHERE (partition_column >= lowerBound
                OR partition_column IS NULL)
        AND partition_column < (lower_bound + stride)
    
    Executor 2:
    
    .. code:: sql
    
        SELECT ... FROM table
        WHERE partition_column >= (lower_bound + stride)
        AND partition_column < (lower_bound + 2 * stride)
    
    ...
    
    Executor N:
    
    .. code:: sql
    
        SELECT ... FROM table
        WHERE partition_column >= (lower_bound + (N-1) * stride)
        AND partition_column <= upper_bound
    
    Where ``stride=(upper_bound - lower_bound) / num_partitions``.
    

    .. note::

    Can be used only with columns of integer, date or timestamp types.
    

    .. note::

    :obj:`~lower_bound`, :obj:`~upper_bound` and :obj:`~num_partitions` are used just to
    calculate the partition stride, **NOT** for filtering the rows in table.
    So all rows in the table will be returned (unlike *Incremental* :ref:`strategy`).
    

    .. note::

    All queries are executed in parallel. To execute them sequentially, use *Batch* :ref:`strategy`.
    
  • hash Allocate each executor a set of values based on hash of the :obj:~partition_column column.

    .. dropdown:: Spark generates for each executor an SQL query

    Executor 1:
    
    .. code:: sql
    
        SELECT ... FROM table
        WHERE (some_hash(partition_column) mod num_partitions) = 0 -- lower_bound
    
    Executor 2:
    
    .. code:: sql
    
        SELECT ... FROM table
        WHERE (some_hash(partition_column) mod num_partitions) = 1 -- lower_bound + 1
    
    ...
    
    Executor N:
    
    .. code:: sql
    
        SELECT ... FROM table
        WHERE (some_hash(partition_column) mod num_partitions) = num_partitions-1 -- upper_bound
    

    .. note::

    The hash function implementation depends on RDBMS. It can be ``MD5`` or any other fast hash function,
    or expression based on this function call. Usually such functions accepts any column type as an input.
    
  • mod Allocate each executor a set of values based on modulus of the :obj:~partition_column column.

    .. dropdown:: Spark generates for each executor an SQL query

    Executor 1:
    
    .. code:: sql
    
        SELECT ... FROM table
        WHERE (partition_column mod num_partitions) = 0 -- lower_bound
    
    Executor 2:
    
    .. code:: sql
    
        SELECT ... FROM table
        WHERE (partition_column mod num_partitions) = 1 -- lower_bound + 1
    
    Executor N:
    
    .. code:: sql
    
        SELECT ... FROM table
        WHERE (partition_column mod num_partitions) = num_partitions-1 -- upper_bound
    

    .. note::

    Can be used only with columns of integer type.
    

.. versionadded:: 0.5.0

Examples

Read data in 10 parallel jobs by range of values in id_column column:

.. code:: python

ReadOptions(
    partitioning_mode="range",  # default mode, can be omitted
    partitionColumn="id_column",
    numPartitions=10,
    # Options below can be discarded because they are
    # calculated automatically as MIN and MAX values of `partitionColumn`
    lowerBound=0,
    upperBound=100_000,
)

Read data in 10 parallel jobs by hash of values in some_column column:

.. code:: python

ReadOptions(
    partitioning_mode="hash",
    partitionColumn="some_column",
    numPartitions=10,
    # lowerBound and upperBound are automatically set to `0` and `9`
)

Read data in 10 parallel jobs by modulus of values in id_column column:

.. code:: python

ReadOptions(
    partitioning_mode="mod",
    partitionColumn="id_column",
    numPartitions=10,
    # lowerBound and upperBound are automatically set to `0` and `9`
)

query_timeout = Field(default=None, alias='queryTimeout') class-attribute instance-attribute

The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.

This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.

session_init_statement = Field(default=None, alias='sessionInitStatement') class-attribute instance-attribute

After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block).

Use this to implement session initialization code.

Example:

.. code:: python

sessionInitStatement = """
    BEGIN
        execute immediate
        'alter session set "_serial_direct_read"=true';
    END;
"""

upper_bound = Field(default=None, alias='upperBound') class-attribute instance-attribute

See documentation for :obj:~partitioning_mode for more details

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised

Source code in onetl/impl/generic_options.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def parse(
    cls: type[T],
    options: GenericOptions | dict | None,
) -> T:
    """
    If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
    If a Dict object was passed it will be converted to ReadOptions.

    Otherwise, an exception will be raised
    """

    if not options:
        return cls()

    if isinstance(options, dict):
        return cls.parse_obj(options)

    if not isinstance(options, cls):
        raise TypeError(
            f"{options.__class__.__name__} is not a {cls.__name__} instance",
        )

    return options