Skip to content

Чтение из Greenplum с использованием DBReader

Данные могут быть прочитаны из Greenplum в Spark с использованием DBReader. Он также поддерживает стратегии для инкрементального чтения данных.

Warning

Пожалуйста, учитывайте типы данных Greenplum.

Note

В отличие от коннекторов JDBC, Greenplum connector for Spark не поддерживает выполнение пользовательских SQL-запросов с использованием метода .sql. Коннектор может использоваться только для чтения данных из таблицы или представления.

Поддерживаемые функции DBReader

  • ✅︎ columns (см. примечание ниже)
  • ✅︎ where (см. примечание ниже)
  • ✅︎ hwm (см. примечание ниже), поддерживаемые стратегии:
  • ✅︎ Snapshot
  • ✅︎ Incremental
  • ✅︎ Snapshot batch
  • ✅︎ Incremental batch
  • hint (не поддерживается Greenplum)
  • df_schema
  • ✅︎ options (см. Greenplum.ReadOptions)

Warning

В случае коннектора Greenplum, DBReader не генерирует сырой запрос SELECT. Вместо этого он полагается на синтаксис Spark SQL, который в некоторых случаях (при использовании проекции столбцов и предикатов) может быть преобразован в SQL Greenplum.

Поэтому columns, where и hwm.expression должны быть указаны в синтаксисе Spark SQL, а не в SQL Greenplum.

Это правильно:

    DBReader(
        columns=[
            "some_column",
            # это преобразование выполняется на стороне Spark
            "CAST(another_column AS STRING)",
        ],
        # этот предикат анализируется Spark и может быть передан в Greenplum
        where="some_column LIKE 'val1%'",
    ) 

А это приведет к ошибке:

    DBReader(
        columns=[
            "some_column",
            # у Spark нет типа `text`
            "CAST(another_column AS text)",
        ],
        # Spark не поддерживает синтаксис ~ для сопоставления регулярных выражений
        where="some_column ~ 'val1.*'",
    ) 

Примеры

Стратегия Snapshot:

```python
    from onetl.connection import Greenplum
    from onetl.db import DBReader

    greenplum = Greenplum(...)

    reader = DBReader(
        connection=greenplum,
        source="schema.table",
        columns=["id", "key", "CAST(value AS string) value", "updated_dt"],
        where="key = 'something'",
    )
    df = reader.run() 

```

Стратегия Incremental:

```python
    from onetl.connection import Greenplum
    from onetl.db import DBReader
    from onetl.strategy import IncrementalStrategy

    greenplum = Greenplum(...)

    reader = DBReader(
        connection=greenplum,
        source="schema.table",
        columns=["id", "key", "CAST(value AS string) value", "updated_dt"],
        where="key = 'something'",
        hwm=DBReader.AutoDetectHWM(name="greenplum_hwm", expression="updated_dt"),
    )

    with IncrementalStrategy():
        df = reader.run()
```

Схема взаимодействия

Высокоуровневая схема описана в Greenplum prerequisites. Ниже вы можете найти подробную схему взаимодействия.

Схема взаимодействия Spark <-> Greenplum во время DBReader.run()
---
title: Greenplum master <-> Spark driver
---

sequenceDiagram
    box "Spark"
    participant A as "Spark driver"
    participant B as "Spark executor1"
    participant C as "Spark executor2"
    participant D as "Spark executorN"
    end

    box "Greenplum"
    participant E as "Greenplum master"
    participant F as "Greenplum segment1"
    participant G as "Greenplum segment2"
    participant H as "Greenplum segmentN"
    end

    note over A,H: == Greenplum.check() ==

    activate A
    activate E
    A ->> E: CONNECT

    A -->> E : CHECK IF TABLE EXISTS gp_table
    E -->> A : TABLE EXISTS
    A ->> E : SHOW SCHEMA FOR gp_table
    E -->> A : (id bigint, col1 int, col2 text, ...)

    note over A,H: == DBReader.run() ==

    A ->> B: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 1
    A ->> C: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 2
    A ->> D: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION N

    note right of A : This is done in parallel,<br/>executors are independent<br/>|<br/>|<br/>|<br/>V
    B ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor1 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor1_host:executor1_port <br/>INSERT INTO EXTERNAL TABLE spark_executor1 FROM gp_table WHERE gp_segment_id = 1
    note right of E : Each white vertical line here is a opened connection to master.<br/>Usually, **N+1** connections are created from Spark to Greenplum master
    activate E
    E -->> F: SELECT DATA FROM gp_table_data_on_segment1 TO spark_executor1
    note right of F : No direct requests between Greenplum segments & Spark driver.<br/>Data transfer is always initiated by Greenplum segments.


    C ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor2 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor2_host:executor2_port <br/>INSERT INTO EXTERNAL TABLE spark_executor2 FROM gp_table WHERE gp_segment_id = 2
    activate E
    E -->> G: SELECT DATA FROM gp_table_data_on_segment2 TO spark_executor2

    D ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executorN (id bigint, col1 int, col2 text, ...)<br/>USING address=executorN_host:executorN_port <br/>INSERT INTO EXTERNAL TABLE spark_executorN FROM gp_table WHERE gp_segment_id = N
    activate E
    E -->> H: SELECT DATA FROM gp_table_data_on_segmentN TO spark_executorN

    F -xB: INITIALIZE CONNECTION TO Spark executor1<br/>PUSH DATA TO Spark executor1
    note left of B : Circle is an open GPFDIST port,<br/>listened by executor

    G -xC: INITIALIZE CONNECTION TO Spark executor2<br/>PUSH DATA TO Spark executor2
    H -xD: INITIALIZE CONNECTION TO Spark executorN<br/>PUSH DATA TO Spark executorN

    note over A,H: == Spark.stop() ==

    B -->> E : DROP TABLE spark_executor1
    deactivate E
    C -->> E : DROP TABLE spark_executor2
    deactivate E
    D -->> E : DROP TABLE spark_executorN
    deactivate E

    B -->> A: DONE
    C -->> A: DONE
    D -->> A: DONE

    A -->> E : CLOSE CONNECTION
    deactivate E
    deactivate A

Рекомендации

Выбирайте только необходимые столбцы

Вместо передачи "*" в DBReader(columns=[...]) предпочтительнее передавать точные имена столбцов. Это уменьшает объем данных, передаваемых из Greenplum в Spark.

Обратите внимание на значение where

Вместо фильтрации данных на стороне Spark с использованием df.filter(df.column == 'value') передайте правильное условие DBReader(where="column = 'value'"). Это уменьшает объем данных, отправляемых из Greenplum в Spark, и может улучшить производительность запроса. Особенно если есть индексы или разделы для столбцов, используемых в условии where.

Параллельное чтение данных

DBReader в случае коннектора Greenplum требует, чтобы представление или таблица имели столбец, который используется Spark для параллельного чтения.

Выбор правильного столбца позволяет каждому исполнителю Spark читать только часть данных, хранящихся в указанном сегменте, избегая перемещения больших объемов данных между сегментами, что улучшает производительность чтения.

Использование gp_segment_id

По умолчанию DBReader будет использовать столбец gp_segment_id для параллельного чтения данных. Каждый раздел DataFrame будет содержать данные конкретного сегмента Greenplum.

Это позволяет каждому исполнителю Spark читать только данные из определенного сегмента Greenplum, избегая перемещения больших объемов данных между сегментами.

Если используется представление, рекомендуется включить столбец gp_segment_id в это представление:

Чтение из представления со столбцом gp_segment_id
    from onetl.connection import Greenplum
    from onetl.db import DBReader

    greenplum = Greenplum(...)

    greenplum.execute(
        """
        CREATE VIEW schema.view_with_gp_segment_id AS
        SELECT
            id,
            some_column,
            another_column,
            gp_segment_id  -- ВАЖНО
        FROM schema.some_table
        """,
    )

    reader = DBReader(
        connection=greenplum,
        source="schema.view_with_gp_segment_id",
    )
    df = reader.run() 

Использование пользовательского partition_column

Иногда в таблице или представлении отсутствует столбец gp_segment_id, но есть некоторый столбец с диапазоном значений, коррелирующим с распределением сегмента Greenplum.

В этом случае вместо него можно использовать пользовательский столбец:

Чтение из представления с пользовательским partition_column
    from onetl.connection import Greenplum
    from onetl.db import DBReader

    greenplum = Greenplum(...)

    greenplum.execute(
        """
        CREATE VIEW schema.view_with_partition_column AS
        SELECT
            id,
            some_column,
            part_column  -- коррелирует с ID сегмента greenplum
        FROM schema.some_table
        """,
    )

    reader = DBReader(
        connection=greenplum,
        source="schema.view_with_partition_column",
        options=Greenplum.ReadOptions(
            # параллельное чтение данных с использованием указанного столбца
            partitionColumn="part_column",
            # создание 10 задач Spark, каждая будет читать только часть данных таблицы
            partitions=10,
        ),
    )
    df = reader.run() 

Чтение таблиц DISTRIBUTED REPLICATED

Реплицированные таблицы вообще не имеют столбца gp_segment_id, поэтому вам нужно установить partition_column на имя некоторого столбца типа integer/bigint/smallint.

Параллельное выполнение JOIN

В случае использования представлений, которые требуют перемещения данных между сегментами Greenplum, например запросов JOIN, следует использовать другой подход.

Каждый экзекутор Spark из N будет выполнять один и тот же запрос, поэтому каждый из N запросов запустит свой собственный процесс JOIN, что приведет к очень высокой нагрузке на сегменты Greenplum.

Этого следует избегать.

Вместо этого рекомендуется выполнить запрос JOIN на стороне Greenplum, сохранить результат во временной таблице, а затем прочитать эту таблицу с помощью DBReader:

Чтение из представления с использованием промежуточной таблицы
    from onetl.connection import Greenplum
    from onetl.db import DBReader

    greenplum = Greenplum(...)

    greenplum.execute(
        """
        CREATE UNLOGGED TABLE schema.intermediate_table AS
        SELECT
            id,
            tbl1.col1,
            tbl1.data,
            tbl2.another_data
        FROM
            schema.table1 as tbl1
        JOIN
            schema.table2 as tbl2
        ON
            tbl1.col1 = tbl2.col2
        WHERE ...
        """,
    )

    reader = DBReader(
        connection=greenplum,
        source="schema.intermediate_table",
    )
    df = reader.run()

    # запись dataframe куда-либо

    greenplum.execute(
        """
        DROP TABLE schema.intermediate_table
        """,
    ) 

Warning

НИКОГДА не делайте так:

    df1 = DBReader(connection=greenplum, target="public.table1", ...).run()
    df2 = DBReader(connection=greenplum, target="public.table2", ...).run()

    joined_df = df1.join(df2, on="col") 

Это приведет к отправке всех данных из обеих таблиц table1 и table2 в память исполнителя Spark, а затем JOIN будет выполнен на стороне Spark, а не внутри Greenplum. Это ОЧЕНЬ неэффективно.

Примечание о таблицах TEMPORARY

Кто-то может подумать, что запись данных из представления или результата JOIN во временную таблицу (TEMPORARY), а затем передача ее в DBReader, является эффективным способом чтения данных из Greenplum. Это потому, что временные таблицы не генерируют файлы WAL и автоматически удаляются после завершения транзакции.

Это НЕ сработает. Каждый экзекутор Spark устанавливает свое собственное соединение с Greenplum. И каждое соединение начинает свою собственную транзакцию, что означает, что каждый исполнитель будет читать пустую временную таблицу.

Вам следует использовать таблицы UNLOGGED для записи данных во временную таблицу без генерации журналов WAL.

Параметры

GreenplumReadOptions

onetl.connection.db_connection.greenplum.options.GreenplumReadOptions

Bases: JDBCOptions

VMware's Greenplum Spark connector reading options.

.. warning::

Some options, like ``url``, ``dbtable``, ``server.*``, ``pool.*``,
etc are populated from connection attributes, and cannot be overridden by the user in ``ReadOptions`` to avoid issues.
Examples

.. note ::

You can pass any value
`supported by connector <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.3/greenplum-connector-spark/read_from_gpdb.html>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

The set of supported options depends on connector version.

.. code:: python

from onetl.connection import Greenplum

options = Greenplum.ReadOptions(
    partitionColumn="reg_id",
    partitions=10,
)
Source code in onetl/connection/db_connection/greenplum/options.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class GreenplumReadOptions(JDBCOptions):
    """VMware's Greenplum Spark connector reading options.

    .. warning::

        Some options, like ``url``, ``dbtable``, ``server.*``, ``pool.*``,
        etc are populated from connection attributes, and cannot be overridden by the user in ``ReadOptions`` to avoid issues.

    Examples
    --------

    .. note ::

        You can pass any value
        `supported by connector <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.3/greenplum-connector-spark/read_from_gpdb.html>`_,
        even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

        The set of supported options depends on connector version.

    .. code:: python

        from onetl.connection import Greenplum

        options = Greenplum.ReadOptions(
            partitionColumn="reg_id",
            partitions=10,
        )
    """

    class Config:
        known_options = READ_OPTIONS | READ_WRITE_OPTIONS
        prohibited_options = JDBCOptions.Config.prohibited_options | GENERIC_PROHIBITED_OPTIONS | WRITE_OPTIONS

    partition_column: Optional[str] = Field(alias="partitionColumn")
    """Column used to parallelize reading from a table.

    .. warning::

        You should not change this option, unless you know what you're doing.

        It's preferable to use default values to read data parallel by number of segments in Greenplum cluster.

    Possible values:
        * ``None`` (default):
            Spark generates N jobs (where N == number of segments in Greenplum cluster),
            each job is reading only data from a specific segment
            (filtering data by ``gp_segment_id`` column).

            This is very effective way to fetch the data from a cluster.

        * table column
            Allocate each executor a range of values from a specific column.

            Spark generates for each executor an SQL query:

            Executor 1:

            .. code:: sql

                SELECT ... FROM table
                WHERE (partition_column >= lowerBound
                        OR partition_column IS NULL)
                AND partition_column < (lower_bound + stride)

            Executor 2:

            .. code:: sql

                SELECT ... FROM table
                WHERE partition_column >= (lower_bound + stride)
                AND partition_column < (lower_bound + 2 * stride)

            ...

            Executor N:

            .. code:: sql

                SELECT ... FROM table
                WHERE partition_column >= (lower_bound + (N-1) * stride)
                AND partition_column <= upper_bound

            Where ``stride=(upper_bound - lower_bound) / num_partitions``,
            ``lower_bound=MIN(partition_column)``, ``upper_bound=MAX(partition_column)``.

            .. note::

                Column type must be numeric. Other types are not supported.

            .. note::

                :obj:`~num_partitions` is used just to
                calculate the partition stride, **NOT** for filtering the rows in table.
                So all rows in the table will be returned (unlike *Incremental* :ref:`strategy`).

            .. note::

                All queries are executed in parallel. To execute them sequentially, use *Batch* :ref:`strategy`.

    .. warning::

        Both options :obj:`~partition_column` and :obj:`~num_partitions` should have a value,
        or both should be ``None``

    Examples
    --------

    Read data in 10 parallel jobs by range of values in ``id_column`` column:

    .. code:: python

        Greenplum.ReadOptions(
            partitionColumn="id_column",
            partitions=10,
        )
    """

    num_partitions: Optional[int] = Field(alias="partitions")
    """Number of jobs created by Spark to read the table content in parallel.

    See documentation for :obj:`~partition_column` for more details

    .. warning::

        By default connector uses number of segments in the Greenplum cluster.
        You should not change this option, unless you know what you're doing

    .. warning::

        Both options :obj:`~partition_column` and :obj:`~num_partitions` should have a value,
        or both should be ``None``
    """

num_partitions = Field(alias='partitions') class-attribute instance-attribute

Number of jobs created by Spark to read the table content in parallel.

See documentation for :obj:~partition_column for more details

.. warning::

By default connector uses number of segments in the Greenplum cluster.
You should not change this option, unless you know what you're doing

.. warning::

Both options :obj:`~partition_column` and :obj:`~num_partitions` should have a value,
or both should be ``None``

partition_column = Field(alias='partitionColumn') class-attribute instance-attribute

Column used to parallelize reading from a table.

.. warning::

You should not change this option, unless you know what you're doing.

It's preferable to use default values to read data parallel by number of segments in Greenplum cluster.
Possible values
  • None (default): Spark generates N jobs (where N == number of segments in Greenplum cluster), each job is reading only data from a specific segment (filtering data by gp_segment_id column).

    This is very effective way to fetch the data from a cluster.

  • table column Allocate each executor a range of values from a specific column.

    Spark generates for each executor an SQL query:

    Executor 1:

    .. code:: sql

    SELECT ... FROM table
    WHERE (partition_column >= lowerBound
            OR partition_column IS NULL)
    AND partition_column < (lower_bound + stride)
    

    Executor 2:

    .. code:: sql

    SELECT ... FROM table
    WHERE partition_column >= (lower_bound + stride)
    AND partition_column < (lower_bound + 2 * stride)
    

    ...

    Executor N:

    .. code:: sql

    SELECT ... FROM table
    WHERE partition_column >= (lower_bound + (N-1) * stride)
    AND partition_column <= upper_bound
    

    Where stride=(upper_bound - lower_bound) / num_partitions, lower_bound=MIN(partition_column), upper_bound=MAX(partition_column).

    .. note::

    Column type must be numeric. Other types are not supported.
    

    .. note::

    :obj:`~num_partitions` is used just to
    calculate the partition stride, **NOT** for filtering the rows in table.
    So all rows in the table will be returned (unlike *Incremental* :ref:`strategy`).
    

    .. note::

    All queries are executed in parallel. To execute them sequentially, use *Batch* :ref:`strategy`.
    

.. warning::

Both options :obj:`~partition_column` and :obj:`~num_partitions` should have a value,
or both should be ``None``
Examples

Read data in 10 parallel jobs by range of values in id_column column:

.. code:: python

Greenplum.ReadOptions(
    partitionColumn="id_column",
    partitions=10,
)