Skip to content

Выполнение запросов в Clickhouse

Warning

Методы, описанные ниже, загружают все строки, возвращаемые из БД, в память драйвера Spark, а затем преобразуют их в DataFrame.

НЕ используйте их для чтения больших объемов данных. Вместо этого используйте DBReader или Clickhouse.sql.

Как использовать

Есть два способа выполнить запрос в Clickhouse:

Использование Clickhouse.fetch

Используйте этот метод для выполнения SELECT-запросов, которые возвращают небольшое количество строк, например, чтение конфигурации Clickhouse или получение данных из справочной таблицы. Метод возвращает Spark DataFrame.

Метод принимает Clickhouse.FetchOptions.

Соединение, открытое с использованием этого метода, должно быть закрыто через connection.close() или with connection:.

Warning

Пожалуйста, учтите типы данных Clickhouse.

Поддержка синтаксиса в Clickhouse.fetch

Этот метод позволяет использовать любой синтаксис запросов, поддерживаемый Clickhouse, например:

  • ✅︎ SELECT ... FROM ...
  • ✅︎ WITH alias AS (...) SELECT ...
  • ✅︎ SELECT func(arg1, arg2) — вызов функции
  • ✅︎ SHOW ...
  • SET ...; SELECT ...; — множественные запросы не поддерживаются

Примеры с Clickhouse.fetch

from onetl.connection import Clickhouse

clickhouse = Clickhouse(...)

df = clickhouse.fetch(
    "SELECT value FROM some.reference_table WHERE key = 'some_constant'",
    options=Clickhouse.FetchOptions(queryTimeout=10),
)
clickhouse.close()
value = df.collect()[0][0]  # получаем значение из первой строки и первого столбца

Использование Clickhouse.execute

Используйте этот метод для выполнения операций DDL (определение данных) и DML (манипуляция данными). Каждый вызов метода выполняет операцию в отдельной транзакции, после чего она фиксируется.

Метод принимает Clickhouse.ExecuteOptions.

Соединение, открытое с использованием этого метода, должно быть закрыто через connection.close() или with connection:.

Поддержка синтаксиса в Clickhouse.execute

Этот метод поддерживает любой синтаксис запросов, используемый Clickhouse, например:

  • ✅︎ CREATE TABLE ..., CREATE VIEW ..., и т.д.
  • ✅︎ ALTER ...
  • ✅︎ INSERT INTO ... SELECT ..., UPDATE ..., DELETE ..., и т.д.
  • ✅︎ DROP TABLE ..., DROP VIEW ..., TRUNCATE TABLE, и т.д.
  • ✅︎ другие утверждения, не упомянутые здесь
  • SET ...; SELECT ...; — множественные запросы не поддерживаются

Примеры с Clickhouse.execute

from onetl.connection import Clickhouse

clickhouse = Clickhouse(...)

clickhouse.execute("DROP TABLE schema.table")
clickhouse.execute(
    """
    CREATE TABLE schema.table (
        id UInt8,
        key String,
        value Float32
    )
    ENGINE = MergeTree()
    ORDER BY id
    """,
    options=Clickhouse.ExecuteOptions(queryTimeout=10),
)

Примечания

Эти методы прочитывают все строки, возвращаемые из БД, в память драйвера Spark, а затем преобразуют их в DataFrame.

Поэтому их не следует использовать для чтения больших объемов данных. Вместо этого используйте DBReader или Clickhouse.sql.

Опции

onetl.connection.db_connection.clickhouse.options.ClickhouseFetchOptions

Bases: JDBCFetchOptions

Source code in onetl/connection/db_connection/clickhouse/options.py
27
28
class ClickhouseFetchOptions(JDBCFetchOptions):
    __doc__ = JDBCFetchOptions.__doc__.replace("SomeDB", "Clickhouse")  # type: ignore[assignment, union-attr]

fetchsize = None class-attribute instance-attribute

How many rows to fetch per round trip.

Tuning this option can influence performance of reading.

.. warning:: Default value depends on driver. For example, Oracle has default fetchsize=10.

query_timeout = Field(default=None, alias='queryTimeout') class-attribute instance-attribute

The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.

This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised

Source code in onetl/impl/generic_options.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def parse(
    cls: type[T],
    options: GenericOptions | dict | None,
) -> T:
    """
    If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
    If a Dict object was passed it will be converted to ReadOptions.

    Otherwise, an exception will be raised
    """

    if not options:
        return cls()

    if isinstance(options, dict):
        return cls.parse_obj(options)

    if not isinstance(options, cls):
        raise TypeError(
            f"{options.__class__.__name__} is not a {cls.__name__} instance",
        )

    return options

onetl.connection.db_connection.clickhouse.options.ClickhouseExecuteOptions

Bases: JDBCExecuteOptions

Source code in onetl/connection/db_connection/clickhouse/options.py
31
32
class ClickhouseExecuteOptions(JDBCExecuteOptions):
    __doc__ = JDBCExecuteOptions.__doc__.replace("SomeDB", "Clickhouse")  # type: ignore[assignment, union-attr]

fetchsize = None class-attribute instance-attribute

How many rows to fetch per round trip.

Tuning this option can influence performance of reading.

.. warning:: Default value depends on driver. For example, Oracle has default fetchsize=10.

query_timeout = Field(default=None, alias='queryTimeout') class-attribute instance-attribute

The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.

This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised

Source code in onetl/impl/generic_options.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def parse(
    cls: type[T],
    options: GenericOptions | dict | None,
) -> T:
    """
    If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
    If a Dict object was passed it will be converted to ReadOptions.

    Otherwise, an exception will be raised
    """

    if not options:
        return cls()

    if isinstance(options, dict):
        return cls.parse_obj(options)

    if not isinstance(options, cls):
        raise TypeError(
            f"{options.__class__.__name__} is not a {cls.__name__} instance",
        )

    return options