Skip to content

Запись в Greenplum с использованием DBWriter

Для записи данных в Greenplum используйте DBWriter с GreenplumWriteOptions.

Warning

Пожалуйста, учитывайте типы данных Greenplum.

Warning

Всегда рекомендуется создавать таблицу явно с помощью Greenplum.execute вместо того, чтобы полагаться на генерацию DDL таблицы Spark.

Это связано с тем, что генератор DDL Spark может создавать столбцы с типами, отличающимися от ожидаемых.

Примеры

```python
from onetl.connection import Greenplum
from onetl.db import DBWriter

greenplum = Greenplum(...)

df = ...  # здесь находятся данные

writer = DBWriter(
    connection=greenplum,
    target="schema.table",
    options=Greenplum.WriteOptions(
        if_exists="append",
        # по умолчанию распределение случайное
        distributedBy="id",
        # partitionBy не поддерживается
    ),
)

writer.run(df) 
```

Схема взаимодействия

Высокоуровневая схема описана в Предварительных требованиях Greenplum. Детальную схему взаимодействия вы можете найти ниже.

Взаимодействие Spark <-> Greenplum во время DBWriter.run()
---
title: Greenplum master <-> Spark driver
---

sequenceDiagram
    box Spark
    participant A as Spark driver
    participant B as Spark executor1
    participant C as Spark executor2
    participant D as Spark executorN
    end

    box Greenplum
    participant E as Greenplum master
    participant F as Greenplum segment1
    participant G as Greenplum segment2
    participant H as Greenplum segmentN
    end

    note over A,H: == Greenplum.check() ==
    A ->> E: CONNECT
    activate E
    activate A

    A -->> E : CHECK IF TABLE EXISTS gp_table
    E -->> A : TABLE EXISTS
    A ->> E : SHOW SCHEMA FOR gp_table
    E -->> A : (id bigint, col1 int, col2 text, ...)

    note over A,H: == DBReader.run() ==

    A ->> B: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 1
    activate B
    A ->> C: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 2
    activate C
    A ->> D: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION N
    activate D

    note right of A : This is done in parallel,<br/>executors are independent<br/>|<br/>|<br/>|<br/>V

    B ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor1 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor1_host:executor1_port<br/>INSERT INTO EXTERNAL TABLE spark_executor1 FROM gp_table WHERE gp_segment_id = 1
    activate E
    note right of E : Each white vertical line here is a opened connection to master.<br/>Usually, **N+1** connections are created from Spark to Greenplum master   
    E -->> F: SELECT DATA FROM gp_table_data_on_segment1 TO spark_executor1
    activate F

    note right of F : No direct requests between Greenplum segments & Spark.<br/>Data transfer is always initiated by Greenplum segments.

    C ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor2 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor2_host:executor2_port<br/>INSERT INTO EXTERNAL TABLE spark_executor2 FROM gp_table WHERE gp_segment_id = 2
    activate E
    E -->> G: SELECT DATA FROM gp_table_data_on_segment2 TO spark_executor2
    activate G

    D ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executorN (id bigint, col1 int, col2 text, ...)<br/>USING address=executorN_host:executorN_port<br/>INSERT INTO EXTERNAL TABLE spark_executorN FROM gp_table WHERE gp_segment_id = N
    activate E
    E -->> H: SELECT DATA FROM gp_table_data_on_segmentN TO spark_executorN
    activate H


    F -xB: INITIALIZE CONNECTION TO Spark executor1<br/>PUSH DATA TO Spark executor1
    deactivate F
    note left of B : Circle is an open GPFDIST port,<br/>listened by executor

    G -xC: INITIALIZE CONNECTION TO Spark executor2<br/>PUSH DATA TO Spark executor2
    deactivate G
    H -xD: INITIALIZE CONNECTION TO Spark executorN<br/>PUSH DATA TO Spark executorN
    deactivate H

    note over A,H: == Spark.stop() ==

    B -->> E : DROP TABLE spark_executor1
    deactivate E
    C -->> E : DROP TABLE spark_executor2
    deactivate E
    D -->> E : DROP TABLE spark_executorN
    deactivate E

    B -->> A: DONE
    deactivate B
    C -->> A: DONE
    deactivate C
    D -->> A: DONE
    deactivate D


    A -->> E: CLOSE CONNECTION
    deactivate E
    deactivate A

Опции

onetl.connection.db_connection.greenplum.options.GreenplumWriteOptions

Bases: JDBCOptions

VMware's Greenplum Spark connector writing options.

.. warning::

Some options, like ``url``, ``dbtable``, ``server.*``, ``pool.*``, etc
are populated from connection attributes, and cannot be overridden by the user in ``WriteOptions`` to avoid issues.
Examples

.. note ::

You can pass any value
`supported by connector <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.3/greenplum-connector-spark/write_to_gpdb.html>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

The set of supported options depends on connector version.

.. code:: python

from onetl.connection import Greenplum

options = Greenplum.WriteOptions(
    if_exists="append",
    truncate="false",
    distributedBy="mycolumn",
)
Source code in onetl/connection/db_connection/greenplum/options.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
class GreenplumWriteOptions(JDBCOptions):
    """VMware's Greenplum Spark connector writing options.

    .. warning::

        Some options, like ``url``, ``dbtable``, ``server.*``, ``pool.*``, etc
        are populated from connection attributes, and cannot be overridden by the user in ``WriteOptions`` to avoid issues.

    Examples
    --------

    .. note ::

        You can pass any value
        `supported by connector <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.3/greenplum-connector-spark/write_to_gpdb.html>`_,
        even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

        The set of supported options depends on connector version.

    .. code:: python

        from onetl.connection import Greenplum

        options = Greenplum.WriteOptions(
            if_exists="append",
            truncate="false",
            distributedBy="mycolumn",
        )
    """

    class Config:
        known_options = WRITE_OPTIONS | READ_WRITE_OPTIONS
        prohibited_options = JDBCOptions.Config.prohibited_options | GENERIC_PROHIBITED_OPTIONS | READ_OPTIONS

    if_exists: GreenplumTableExistBehavior = Field(  # type: ignore[literal-required]
        default=GreenplumTableExistBehavior.APPEND,
        alias=avoid_alias("mode"),
    )
    """Behavior of writing data into existing table.

    Possible values:
        * ``append`` (default)
            Adds new rows into existing table.

            .. dropdown:: Behavior in details

                * Table does not exist
                    Table is created using options provided by user
                    (``distributedBy`` and others).

                * Table exists
                    Data is appended to a table. Table has the same DDL as before writing data.

                    .. warning::

                        This mode does not check whether table already contains
                        rows from dataframe, so duplicated rows can be created.

                        Also Spark does not support passing custom options to
                        insert statement, like ``ON CONFLICT``, so don't try to
                        implement deduplication using unique indexes or constraints.

                        Instead, write to staging table and perform deduplication
                        using :obj:`~execute` method.

        * ``replace_entire_table``
            **Table is dropped and then created**.

            .. dropdown:: Behavior in details

                * Table does not exist
                    Table is created using options provided by user
                    (``distributedBy`` and others).

                * Table exists
                    Table content is replaced with dataframe content.

                    After writing completed, target table could either have the same DDL as
                    before writing data (``truncate=True``), or can be recreated (``truncate=False``).

        * ``ignore``
            Ignores the write operation if the table already exists.

            .. dropdown:: Behavior in details

                * Table does not exist
                    Table is created using options provided by user
                    (``distributedBy`` and others).

                * Table exists
                    The write operation is ignored, and no data is written to the table.

        * ``error``
            Raises an error if the table already exists.

            .. dropdown:: Behavior in details

                * Table does not exist
                    Table is created using options provided by user
                    (``distributedBy`` and others).

                * Table exists
                    An error is raised, and no data is written to the table.

    .. versionchanged:: 0.9.0
        Renamed ``mode`` → ``if_exists``
    """

    @root_validator(pre=True)
    def _mode_is_deprecated(cls, values):
        if "mode" in values:
            warnings.warn(
                "Option `Greenplum.WriteOptions(mode=...)` is deprecated since v0.9.0 and will be removed in v1.0.0. "
                "Use `Greenplum.WriteOptions(if_exists=...)` instead",
                category=UserWarning,
                stacklevel=3,
            )
        return values

if_exists = Field(default=(GreenplumTableExistBehavior.APPEND), alias=(avoid_alias('mode'))) class-attribute instance-attribute

Behavior of writing data into existing table.

Possible values
  • append (default) Adds new rows into existing table.

    .. dropdown:: Behavior in details

    * Table does not exist
        Table is created using options provided by user
        (``distributedBy`` and others).
    
    * Table exists
        Data is appended to a table. Table has the same DDL as before writing data.
    
        .. warning::
    
            This mode does not check whether table already contains
            rows from dataframe, so duplicated rows can be created.
    
            Also Spark does not support passing custom options to
            insert statement, like ``ON CONFLICT``, so don't try to
            implement deduplication using unique indexes or constraints.
    
            Instead, write to staging table and perform deduplication
            using :obj:`~execute` method.
    
  • replace_entire_table Table is dropped and then created.

    .. dropdown:: Behavior in details

    * Table does not exist
        Table is created using options provided by user
        (``distributedBy`` and others).
    
    * Table exists
        Table content is replaced with dataframe content.
    
        After writing completed, target table could either have the same DDL as
        before writing data (``truncate=True``), or can be recreated (``truncate=False``).
    
  • ignore Ignores the write operation if the table already exists.

    .. dropdown:: Behavior in details

    * Table does not exist
        Table is created using options provided by user
        (``distributedBy`` and others).
    
    * Table exists
        The write operation is ignored, and no data is written to the table.
    
  • error Raises an error if the table already exists.

    .. dropdown:: Behavior in details

    * Table does not exist
        Table is created using options provided by user
        (``distributedBy`` and others).
    
    * Table exists
        An error is raised, and no data is written to the table.
    

.. versionchanged:: 0.9.0 Renamed modeif_exists