Skip to content

Выполнение операторов в Postgres

Warning

Методы ниже считывают все строки, возвращаемые из БД, в память драйвера Spark, а затем преобразуют их в DataFrame.

НЕ используйте их для чтения больших объемов данных. Вместо этого используйте DBReader или Postgres.sql.

Как использовать

Существует 2 способа выполнения операторов в Postgres

Использование Postgres.fetch

Используйте этот метод для выполнения запроса SELECT, который возвращает небольшое количество строк, например, для чтения конфигурации Postgres или данных из справочной таблицы. Метод возвращает Spark DataFrame.

Метод принимает Postgres.FetchOptions.

Соединение, открытое с помощью этого метода, следует затем закрыть с помощью connection.close() или with connection:.

Warning

Пожалуйста, учитывайте типы данных Postgres.

Поддержка синтаксиса в Postgres.fetch

Этот метод поддерживает любой синтаксис запросов, поддерживаемый Postgres, например:

  • ✅︎ SELECT ... FROM ...
  • ✅︎ WITH alias AS (...) SELECT ...
  • SET ...; SELECT ...; - множественные операторы не поддерживаются

Примеры с Postgres.fetch

```python
    from onetl.connection import Postgres

    postgres = Postgres(...)

    df = postgres.fetch(
        "SELECT value FROM some.reference_table WHERE key = 'some_constant'",
        options=Postgres.FetchOptions(queryTimeout=10),
    )
    postgres.close()
    value = df.collect()[0][0]  # получить значение из первой строки и первого столбца
```

Использование Postgres.execute

Используйте этот метод для выполнения операций DDL и DML. Каждый вызов метода выполняет операцию в отдельной транзакции, а затем фиксирует её.

Метод принимает Postgres.ExecuteOptions.

Соединение, открытое с помощью этого метода, следует затем закрыть с помощью connection.close() или with connection:.

Поддержка синтаксиса в Postgres.execute

Этот метод поддерживает любой синтаксис запросов, поддерживаемый Postgres, например:

  • ✅︎ CREATE TABLE ..., CREATE VIEW ... и так далее
  • ✅︎ ALTER ...
  • ✅︎ INSERT INTO ... SELECT ..., UPDATE ..., DELETE ... и так далее
  • ✅︎ DROP TABLE ..., DROP VIEW ..., TRUNCATE TABLE и так далее
  • ✅︎ CALL procedure(arg1, arg2) ...
  • ✅︎ SELECT func(arg1, arg2) или {call func(arg1, arg2)} - специальный синтаксис для вызова функций
  • ✅︎ другие операторы, не упомянутые здесь
  • SET ...; SELECT ...; - множественные операторы не поддерживаются

Примеры с Postgres.execute

```python
    from onetl.connection import Postgres

    postgres = Postgres(...)

    postgres.execute("DROP TABLE schema.table")
    postgres.execute(
        """
        CREATE TABLE schema.table (
            id bigint GENERATED ALWAYS AS IDENTITY,
            key text,
            value real
        )
        """,
        options=Postgres.ExecuteOptions(queryTimeout=10),
    )
```

Параметры

onetl.connection.db_connection.postgres.options.PostgresFetchOptions

Bases: JDBCFetchOptions

Source code in onetl/connection/db_connection/postgres/options.py
27
28
class PostgresFetchOptions(JDBCFetchOptions):
    __doc__ = JDBCFetchOptions.__doc__.replace("SomeDB", "Postgres")  # type: ignore[assignment, union-attr]

fetchsize = None class-attribute instance-attribute

How many rows to fetch per round trip.

Tuning this option can influence performance of reading.

.. warning:: Default value depends on driver. For example, Oracle has default fetchsize=10.

query_timeout = Field(default=None, alias='queryTimeout') class-attribute instance-attribute

The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.

This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised

Source code in onetl/impl/generic_options.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def parse(
    cls: type[T],
    options: GenericOptions | dict | None,
) -> T:
    """
    If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
    If a Dict object was passed it will be converted to ReadOptions.

    Otherwise, an exception will be raised
    """

    if not options:
        return cls()

    if isinstance(options, dict):
        return cls.parse_obj(options)

    if not isinstance(options, cls):
        raise TypeError(
            f"{options.__class__.__name__} is not a {cls.__name__} instance",
        )

    return options

onetl.connection.db_connection.postgres.options.PostgresExecuteOptions

Bases: JDBCExecuteOptions

Source code in onetl/connection/db_connection/postgres/options.py
31
32
class PostgresExecuteOptions(JDBCExecuteOptions):
    __doc__ = JDBCExecuteOptions.__doc__.replace("SomeDB", "Postgres")  # type: ignore[assignment, union-attr]

fetchsize = None class-attribute instance-attribute

How many rows to fetch per round trip.

Tuning this option can influence performance of reading.

.. warning:: Default value depends on driver. For example, Oracle has default fetchsize=10.

query_timeout = Field(default=None, alias='queryTimeout') class-attribute instance-attribute

The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.

This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised

Source code in onetl/impl/generic_options.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def parse(
    cls: type[T],
    options: GenericOptions | dict | None,
) -> T:
    """
    If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
    If a Dict object was passed it will be converted to ReadOptions.

    Otherwise, an exception will be raised
    """

    if not options:
        return cls()

    if isinstance(options, dict):
        return cls.parse_obj(options)

    if not isinstance(options, cls):
        raise TypeError(
            f"{options.__class__.__name__} is not a {cls.__name__} instance",
        )

    return options