Skip to content

Выполнение предложений в Greenplum

Warning

Методы, описанные ниже, загружают все строки из БД в память драйвера Spark, а затем конвертируют их в DataFrame.

НЕ используйте их для чтения больших объёмов данных. Вместо этого используйте DBReader.

Как использовать

Существует 2 способа выполнения запросов в Greenplum

Использование Greenplum.fetch

Используйте этот метод для выполнения запросов SELECT, которые возвращают небольшое количество строк, например, для чтения конфигурации Greenplum или данных из справочных таблиц. Метод возвращает Spark DataFrame.

Метод принимает Greenplum.FetchOptions.

Соединение, открытое с помощью этого метода, следует закрыть с помощью connection.close() или конструкции with connection:.

Warning

Greenplum.fetch реализован с использованием JDBC-подключения к Postgres, поэтому типы данных обрабатываются немного иначе, чем в DBReader. См. Типы данных Postgres.

Поддержка синтаксиса в Greenplum.fetch

Этот метод позволяет использовать любой синтаксис запросов, который поддерживается Greenplum, например:

  • ✅︎ SELECT ... FROM ...
  • ✅︎ WITH alias AS (...) SELECT ...
  • ✅︎ SELECT func(arg1, arg2) или {call func(arg1, arg2)} - специальный синтаксис для вызова функций
  • SET ...; SELECT ...; - множественные запросы не поддерживаются

Примеры с Greenplum.fetch

```python
from onetl.connection import Greenplum

greenplum = Greenplum(...)

df = greenplum.fetch(
    "SELECT value FROM some.reference_table WHERE key = 'some_constant'",
    options=Greenplum.FetchOptions(queryTimeout=10),
)
greenplum.close()
value = df.collect()[0][0]  # получить значение из первой строки и первого столбца 
```

Использование Greenplum.execute

Используйте этот метод для выполнения операций DDL и DML. Каждый вызов метода выполняет операцию в отдельной транзакции и затем фиксирует её.

Метод принимает Greenplum.ExecuteOptions.

Соединение, открытое с помощью этого метода, следует закрыть с помощью connection.close() или конструкции with connection:.

Поддержка синтаксиса в Greenplum.execute

Этот метод поддерживает любой синтаксис запросов, который можно использовать в Greenplum, например:

  • ✅︎ CREATE TABLE ..., CREATE VIEW ... и т.д.
  • ✅︎ ALTER ...
  • ✅︎ INSERT INTO ... SELECT ..., UPDATE ..., DELETE ... и т.д.
  • ✅︎ DROP TABLE ..., DROP VIEW ..., TRUNCATE TABLE и т.д.
  • ✅︎ CALL procedure(arg1, arg2) ...
  • ✅︎ SELECT func(arg1, arg2) или {call func(arg1, arg2)} - специальный синтаксис для вызова функций
  • ✅︎ другие запросы, не упомянутые здесь
  • SET ...; SELECT ...; - множественные запросы не поддерживаются

Примеры с Greenplum.execute

```python
from onetl.connection import Greenplum

greenplum = Greenplum(...)

greenplum.execute("DROP TABLE schema.table")
greenplum.execute(
    """
    CREATE TABLE schema.table (
        id int,
        key text,
        value real
    )
    DISTRIBUTED BY id
    """,
    options=Greenplum.ExecuteOptions(queryTimeout=10),
) 
```

Схема взаимодействия

В отличие от чтения и записи, выполнение запросов в Greenplum происходит только через мастер-узел Greenplum без какого-либо взаимодействия между сегментами Greenplum и исполнителями Spark. Более того, исполнители Spark в этом случае не используются.

Единственный порт, используемый при взаимодействии с Greenplum в этом случае — 5432 (порт мастер-узла Greenplum).

Взаимодействие Spark <-> Greenplum при Greenplum.execute()/Greenplum.fetch()
    @startuml
    title Greenplum master <-> Spark driver
            box Spark
            participant "Spark driver"
            end box

            box "Greenplum"
            participant "Greenplum master"
            end box

            == Greenplum.check() ==

            activate "Spark driver"
            "Spark driver" -> "Greenplum master" ++ : CONNECT

            == Greenplum.execute(statement) ==
            "Spark driver" --> "Greenplum master" : EXECUTE statement
            "Greenplum master" -> "Spark driver" : RETURN result

            == Greenplum.close() ==
            "Spark driver" --> "Greenplum master" : CLOSE CONNECTION

            deactivate "Greenplum master"
            deactivate "Spark driver"
    @enduml
    ---
    title: Greenplum master <—> Spark driver
    ---

    sequenceDiagram
        box Spark
        participant A as Spark driver
        end
        box Greenplum
        participant B as Greenplum master
        end

        Note over A,B: == Greenplum.check() ==

        A->>B: CONNECT

        Note over A,B: == Greenplum.execute(statement) ==

        A-->>B: EXECUTE statement
        B-->> A: RETURN result

        Note over A,B: == Greenplum.close() ==

        A ->> B: CLOSE CONNECTION

Опции

onetl.connection.db_connection.greenplum.options.GreenplumFetchOptions

Bases: JDBCFetchOptions

Source code in onetl/connection/db_connection/greenplum/options.py
332
333
class GreenplumFetchOptions(JDBCFetchOptions):
    __doc__ = JDBCFetchOptions.__doc__.replace("SomeDB", "Greenplum")  # type: ignore[assignment, union-attr]

fetchsize = None class-attribute instance-attribute

How many rows to fetch per round trip.

Tuning this option can influence performance of reading.

.. warning:: Default value depends on driver. For example, Oracle has default fetchsize=10.

query_timeout = Field(default=None, alias='queryTimeout') class-attribute instance-attribute

The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.

This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised

Source code in onetl/impl/generic_options.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def parse(
    cls: type[T],
    options: GenericOptions | dict | None,
) -> T:
    """
    If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
    If a Dict object was passed it will be converted to ReadOptions.

    Otherwise, an exception will be raised
    """

    if not options:
        return cls()

    if isinstance(options, dict):
        return cls.parse_obj(options)

    if not isinstance(options, cls):
        raise TypeError(
            f"{options.__class__.__name__} is not a {cls.__name__} instance",
        )

    return options

onetl.connection.db_connection.greenplum.options.GreenplumExecuteOptions

Bases: JDBCExecuteOptions

Source code in onetl/connection/db_connection/greenplum/options.py
336
337
class GreenplumExecuteOptions(JDBCExecuteOptions):
    __doc__ = JDBCExecuteOptions.__doc__.replace("SomeDB", "Greenplum")  # type: ignore[assignment, union-attr]

fetchsize = None class-attribute instance-attribute

How many rows to fetch per round trip.

Tuning this option can influence performance of reading.

.. warning:: Default value depends on driver. For example, Oracle has default fetchsize=10.

query_timeout = Field(default=None, alias='queryTimeout') class-attribute instance-attribute

The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.

This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised

Source code in onetl/impl/generic_options.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def parse(
    cls: type[T],
    options: GenericOptions | dict | None,
) -> T:
    """
    If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
    If a Dict object was passed it will be converted to ReadOptions.

    Otherwise, an exception will be raised
    """

    if not options:
        return cls()

    if isinstance(options, dict):
        return cls.parse_obj(options)

    if not isinstance(options, cls):
        raise TypeError(
            f"{options.__class__.__name__} is not a {cls.__name__} instance",
        )

    return options