Skip to content

Чтение из Postgres с использованием Postgres.sql

Postgres.sql позволяет передавать пользовательский SQL-запрос, но не поддерживает инкрементальные стратегии.

Warning

Пожалуйста, учитывайте типы данных Postgres

Warning

Запрос выполняется в соединении с режимом чтение-запись, поэтому если вы вызываете функции/процедуры с DDL/DML-операторами внутри, они могут изменить данные в вашей базе данных.

Поддержка синтаксиса

Поддерживаются только запросы со следующим синтаксисом:

  • ✅︎ SELECT ... FROM ...
  • ✅︎ WITH alias AS (...) SELECT ...
  • SET ...; SELECT ...; - несколько операторов не поддерживаются

Примеры

```python
    from onetl.connection import Postgres

    postgres = Postgres(...)
    df = postgres.sql(
        """
        SELECT
            id,
            key,
            CAST(value AS text) value,
            updated_at
        FROM
            some.mytable
        WHERE
            key = 'something'
        """,
        options=Postgres.SQLOptions(
            partitionColumn="id",
            numPartitions=10,
            lowerBound=0,
            upperBound=1000,
        ),
    )
```

Рекомендации

Выбирайте только необходимые столбцы

Вместо использования SELECT * FROM ... предпочтительнее указывать точные имена столбцов SELECT col1, col2, .... Это уменьшает объем данных, передаваемых из Postgres в Spark.

Обращайте внимание на значения в условии where

Вместо фильтрации данных на стороне Spark с помощью df.filter(df.column == 'value') передавайте соответствующее условие WHERE column = 'value'. Это не только уменьшает объем данных, передаваемых из Postgres в Spark, но также может улучшить производительность запроса. Особенно если для столбцов, используемых в условии where, есть индексы или секционирование.

Опции

onetl.connection.db_connection.postgres.options.PostgresSQLOptions

Bases: JDBCSQLOptions

Source code in onetl/connection/db_connection/postgres/options.py
23
24
class PostgresSQLOptions(JDBCSQLOptions):
    __doc__ = JDBCSQLOptions.__doc__.replace("SomeDB", "Postgres")  # type: ignore[assignment, union-attr]

fetchsize = 100000 class-attribute instance-attribute

Fetch N rows from an opened cursor per one read round.

Tuning this option can influence performance of reading.

.. warning::

Default value is different from Spark.

Spark uses driver's own value, and it may be different in different drivers,
and even versions of the same driver. For example, Oracle has
default ``fetchsize=10``, which is absolutely not usable.

Thus we've overridden default value with ``100_000``, which should increase reading performance.

.. versionchanged:: 0.2.0 Set explicit default value to 100_000

lower_bound = Field(default=None, alias='lowerBound') class-attribute instance-attribute

Defines the starting boundary for partitioning the query's data. Mandatory if :obj:~partition_column is set

num_partitions = Field(default=None, alias='numPartitions') class-attribute instance-attribute

Number of jobs created by Spark to read the table content in parallel.

partition_column = Field(default=None, alias='partitionColumn') class-attribute instance-attribute

Column used to partition data across multiple executors for parallel query processing.

.. warning:: It is highly recommended to use primary key, or column with an index to avoid performance issues.

.. dropdown:: Example of using partitionColumn="id" with partitioning_mode="range"

.. code-block:: sql

    -- If partition_column is 'id', with numPartitions=4, lowerBound=1, and upperBound=100:
    -- Executor 1 processes IDs from 1 to 25
    SELECT ... FROM table WHERE id >= 1 AND id < 26
    -- Executor 2 processes IDs from 26 to 50
    SELECT ... FROM table WHERE id >= 26 AND id < 51
    -- Executor 3 processes IDs from 51 to 75
    SELECT ... FROM table WHERE id >= 51 AND id < 76
    -- Executor 4 processes IDs from 76 to 100
    SELECT ... FROM table WHERE id >= 76 AND id <= 100


    -- General case for Executor N
    SELECT ... FROM table
    WHERE partition_column >= (lowerBound + (N-1) * stride)
    AND partition_column <= upperBound
    -- Where ``stride`` is calculated as ``(upperBound - lowerBound) / numPartitions``.

query_timeout = Field(default=None, alias='queryTimeout') class-attribute instance-attribute

The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.

This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.

session_init_statement = Field(default=None, alias='sessionInitStatement') class-attribute instance-attribute

After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block).

Use this to implement session initialization code.

Example:

.. code:: python

sessionInitStatement = """
    BEGIN
        execute immediate
        'alter session set "_serial_direct_read"=true';
    END;
"""

upper_bound = Field(default=None, alias='upperBound') class-attribute instance-attribute

Sets the ending boundary for data partitioning. Mandatory if :obj:~partition_column is set

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised

Source code in onetl/impl/generic_options.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def parse(
    cls: type[T],
    options: GenericOptions | dict | None,
) -> T:
    """
    If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
    If a Dict object was passed it will be converted to ReadOptions.

    Otherwise, an exception will be raised
    """

    if not options:
        return cls()

    if isinstance(options, dict):
        return cls.parse_obj(options)

    if not isinstance(options, cls):
        raise TypeError(
            f"{options.__class__.__name__} is not a {cls.__name__} instance",
        )

    return options