Запись в Greenplum с использованием DBWriter
Для записи данных в Greenplum используйте DBWriter с GreenplumWriteOptions.
Warning
Пожалуйста, учитывайте типы данных Greenplum.
Warning
Всегда рекомендуется создавать таблицу явно с помощью Greenplum.execute вместо того, чтобы полагаться на генерацию DDL таблицы Spark.
Это связано с тем, что генератор DDL Spark может создавать столбцы с типами, отличающимися от ожидаемых.
Примеры
```python
from onetl.connection import Greenplum
from onetl.db import DBWriter
greenplum = Greenplum(...)
df = ... # здесь находятся данные
writer = DBWriter(
connection=greenplum,
target="schema.table",
options=Greenplum.WriteOptions(
if_exists="append",
# по умолчанию распределение случайное
distributedBy="id",
# partitionBy не поддерживается
),
)
writer.run(df)
```
Схема взаимодействия
Высокоуровневая схема описана в Предварительных требованиях Greenplum. Детальную схему взаимодействия вы можете найти ниже.
Взаимодействие Spark <-> Greenplum во время DBWriter.run()
---
title: Greenplum master <-> Spark driver
---
sequenceDiagram
box Spark
participant A as Spark driver
participant B as Spark executor1
participant C as Spark executor2
participant D as Spark executorN
end
box Greenplum
participant E as Greenplum master
participant F as Greenplum segment1
participant G as Greenplum segment2
participant H as Greenplum segmentN
end
note over A,H: == Greenplum.check() ==
A ->> E: CONNECT
activate E
activate A
A -->> E : CHECK IF TABLE EXISTS gp_table
E -->> A : TABLE EXISTS
A ->> E : SHOW SCHEMA FOR gp_table
E -->> A : (id bigint, col1 int, col2 text, ...)
note over A,H: == DBReader.run() ==
A ->> B: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 1
activate B
A ->> C: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 2
activate C
A ->> D: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION N
activate D
note right of A : This is done in parallel,<br/>executors are independent<br/>|<br/>|<br/>|<br/>V
B ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor1 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor1_host:executor1_port<br/>INSERT INTO EXTERNAL TABLE spark_executor1 FROM gp_table WHERE gp_segment_id = 1
activate E
note right of E : Each white vertical line here is a opened connection to master.<br/>Usually, **N+1** connections are created from Spark to Greenplum master
E -->> F: SELECT DATA FROM gp_table_data_on_segment1 TO spark_executor1
activate F
note right of F : No direct requests between Greenplum segments & Spark.<br/>Data transfer is always initiated by Greenplum segments.
C ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor2 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor2_host:executor2_port<br/>INSERT INTO EXTERNAL TABLE spark_executor2 FROM gp_table WHERE gp_segment_id = 2
activate E
E -->> G: SELECT DATA FROM gp_table_data_on_segment2 TO spark_executor2
activate G
D ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executorN (id bigint, col1 int, col2 text, ...)<br/>USING address=executorN_host:executorN_port<br/>INSERT INTO EXTERNAL TABLE spark_executorN FROM gp_table WHERE gp_segment_id = N
activate E
E -->> H: SELECT DATA FROM gp_table_data_on_segmentN TO spark_executorN
activate H
F -xB: INITIALIZE CONNECTION TO Spark executor1<br/>PUSH DATA TO Spark executor1
deactivate F
note left of B : Circle is an open GPFDIST port,<br/>listened by executor
G -xC: INITIALIZE CONNECTION TO Spark executor2<br/>PUSH DATA TO Spark executor2
deactivate G
H -xD: INITIALIZE CONNECTION TO Spark executorN<br/>PUSH DATA TO Spark executorN
deactivate H
note over A,H: == Spark.stop() ==
B -->> E : DROP TABLE spark_executor1
deactivate E
C -->> E : DROP TABLE spark_executor2
deactivate E
D -->> E : DROP TABLE spark_executorN
deactivate E
B -->> A: DONE
deactivate B
C -->> A: DONE
deactivate C
D -->> A: DONE
deactivate D
A -->> E: CLOSE CONNECTION
deactivate E
deactivate A
Опции
onetl.connection.db_connection.greenplum.options.GreenplumWriteOptions
Bases: JDBCOptions
VMware's Greenplum Spark connector writing options.
.. warning::
Some options, like ``url``, ``dbtable``, ``server.*``, ``pool.*``, etc
are populated from connection attributes, and cannot be overridden by the user in ``WriteOptions`` to avoid issues.
Examples
.. note ::
You can pass any value
`supported by connector <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.3/greenplum-connector-spark/write_to_gpdb.html>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!
The set of supported options depends on connector version.
.. code:: python
from onetl.connection import Greenplum
options = Greenplum.WriteOptions(
if_exists="append",
truncate="false",
distributedBy="mycolumn",
)
Source code in onetl/connection/db_connection/greenplum/options.py
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 | |
if_exists = Field(default=(GreenplumTableExistBehavior.APPEND), alias=(avoid_alias('mode')))
class-attribute
instance-attribute
Behavior of writing data into existing table.
Possible values
-
append(default) Adds new rows into existing table... dropdown:: Behavior in details
* Table does not exist Table is created using options provided by user (``distributedBy`` and others). * Table exists Data is appended to a table. Table has the same DDL as before writing data. .. warning:: This mode does not check whether table already contains rows from dataframe, so duplicated rows can be created. Also Spark does not support passing custom options to insert statement, like ``ON CONFLICT``, so don't try to implement deduplication using unique indexes or constraints. Instead, write to staging table and perform deduplication using :obj:`~execute` method. -
replace_entire_tableTable is dropped and then created... dropdown:: Behavior in details
* Table does not exist Table is created using options provided by user (``distributedBy`` and others). * Table exists Table content is replaced with dataframe content. After writing completed, target table could either have the same DDL as before writing data (``truncate=True``), or can be recreated (``truncate=False``). -
ignoreIgnores the write operation if the table already exists... dropdown:: Behavior in details
* Table does not exist Table is created using options provided by user (``distributedBy`` and others). * Table exists The write operation is ignored, and no data is written to the table. -
errorRaises an error if the table already exists... dropdown:: Behavior in details
* Table does not exist Table is created using options provided by user (``distributedBy`` and others). * Table exists An error is raised, and no data is written to the table.
.. versionchanged:: 0.9.0
Renamed mode → if_exists