Чтение из Greenplum с использованием DBReader
Данные могут быть прочитаны из Greenplum в Spark с использованием DBReader. Он также поддерживает стратегии для инкрементального чтения данных.
Warning
Пожалуйста, учитывайте типы данных Greenplum.
Note
В отличие от коннекторов JDBC, Greenplum connector for Spark не поддерживает выполнение пользовательских SQL-запросов с использованием метода .sql. Коннектор может использоваться только для чтения данных из таблицы или представления.
Поддерживаемые функции DBReader
- ✅︎
columns(см. примечание ниже) - ✅︎
where(см. примечание ниже) - ✅︎
hwm(см. примечание ниже), поддерживаемые стратегии: - ✅︎ Snapshot
- ✅︎ Incremental
- ✅︎ Snapshot batch
- ✅︎ Incremental batch
- ❌
hint(не поддерживается Greenplum) - ❌
df_schema - ✅︎
options(см. Greenplum.ReadOptions)
Warning
В случае коннектора Greenplum, DBReader не генерирует сырой запрос SELECT. Вместо этого он полагается на синтаксис Spark SQL, который в некоторых случаях (при использовании проекции столбцов и предикатов) может быть преобразован в SQL Greenplum.
Поэтому columns, where и hwm.expression должны быть указаны в синтаксисе Spark SQL, а не в SQL Greenplum.
Это правильно:
DBReader(
columns=[
"some_column",
# это преобразование выполняется на стороне Spark
"CAST(another_column AS STRING)",
],
# этот предикат анализируется Spark и может быть передан в Greenplum
where="some_column LIKE 'val1%'",
)
А это приведет к ошибке:
DBReader(
columns=[
"some_column",
# у Spark нет типа `text`
"CAST(another_column AS text)",
],
# Spark не поддерживает синтаксис ~ для сопоставления регулярных выражений
where="some_column ~ 'val1.*'",
)
Примеры
Стратегия Snapshot:
```python
from onetl.connection import Greenplum
from onetl.db import DBReader
greenplum = Greenplum(...)
reader = DBReader(
connection=greenplum,
source="schema.table",
columns=["id", "key", "CAST(value AS string) value", "updated_dt"],
where="key = 'something'",
)
df = reader.run()
```
Стратегия Incremental:
```python
from onetl.connection import Greenplum
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy
greenplum = Greenplum(...)
reader = DBReader(
connection=greenplum,
source="schema.table",
columns=["id", "key", "CAST(value AS string) value", "updated_dt"],
where="key = 'something'",
hwm=DBReader.AutoDetectHWM(name="greenplum_hwm", expression="updated_dt"),
)
with IncrementalStrategy():
df = reader.run()
```
Схема взаимодействия
Высокоуровневая схема описана в Greenplum prerequisites. Ниже вы можете найти подробную схему взаимодействия.
Схема взаимодействия Spark <-> Greenplum во время DBReader.run()
---
title: Greenplum master <-> Spark driver
---
sequenceDiagram
box "Spark"
participant A as "Spark driver"
participant B as "Spark executor1"
participant C as "Spark executor2"
participant D as "Spark executorN"
end
box "Greenplum"
participant E as "Greenplum master"
participant F as "Greenplum segment1"
participant G as "Greenplum segment2"
participant H as "Greenplum segmentN"
end
note over A,H: == Greenplum.check() ==
activate A
activate E
A ->> E: CONNECT
A -->> E : CHECK IF TABLE EXISTS gp_table
E -->> A : TABLE EXISTS
A ->> E : SHOW SCHEMA FOR gp_table
E -->> A : (id bigint, col1 int, col2 text, ...)
note over A,H: == DBReader.run() ==
A ->> B: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 1
A ->> C: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 2
A ->> D: START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION N
note right of A : This is done in parallel,<br/>executors are independent<br/>|<br/>|<br/>|<br/>V
B ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor1 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor1_host:executor1_port <br/>INSERT INTO EXTERNAL TABLE spark_executor1 FROM gp_table WHERE gp_segment_id = 1
note right of E : Each white vertical line here is a opened connection to master.<br/>Usually, **N+1** connections are created from Spark to Greenplum master
activate E
E -->> F: SELECT DATA FROM gp_table_data_on_segment1 TO spark_executor1
note right of F : No direct requests between Greenplum segments & Spark driver.<br/>Data transfer is always initiated by Greenplum segments.
C ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executor2 (id bigint, col1 int, col2 text, ...)<br/>USING address=executor2_host:executor2_port <br/>INSERT INTO EXTERNAL TABLE spark_executor2 FROM gp_table WHERE gp_segment_id = 2
activate E
E -->> G: SELECT DATA FROM gp_table_data_on_segment2 TO spark_executor2
D ->> E: CREATE WRITABLE EXTERNAL TABLE spark_executorN (id bigint, col1 int, col2 text, ...)<br/>USING address=executorN_host:executorN_port <br/>INSERT INTO EXTERNAL TABLE spark_executorN FROM gp_table WHERE gp_segment_id = N
activate E
E -->> H: SELECT DATA FROM gp_table_data_on_segmentN TO spark_executorN
F -xB: INITIALIZE CONNECTION TO Spark executor1<br/>PUSH DATA TO Spark executor1
note left of B : Circle is an open GPFDIST port,<br/>listened by executor
G -xC: INITIALIZE CONNECTION TO Spark executor2<br/>PUSH DATA TO Spark executor2
H -xD: INITIALIZE CONNECTION TO Spark executorN<br/>PUSH DATA TO Spark executorN
note over A,H: == Spark.stop() ==
B -->> E : DROP TABLE spark_executor1
deactivate E
C -->> E : DROP TABLE spark_executor2
deactivate E
D -->> E : DROP TABLE spark_executorN
deactivate E
B -->> A: DONE
C -->> A: DONE
D -->> A: DONE
A -->> E : CLOSE CONNECTION
deactivate E
deactivate A
Рекомендации
Выбирайте только необходимые столбцы
Вместо передачи "*" в DBReader(columns=[...]) предпочтительнее передавать точные имена столбцов. Это уменьшает объем данных, передаваемых из Greenplum в Spark.
Обратите внимание на значение where
Вместо фильтрации данных на стороне Spark с использованием df.filter(df.column == 'value') передайте правильное условие DBReader(where="column = 'value'"). Это уменьшает объем данных, отправляемых из Greenplum в Spark, и может улучшить производительность запроса. Особенно если есть индексы или разделы для столбцов, используемых в условии where.
Параллельное чтение данных
DBReader в случае коннектора Greenplum требует, чтобы представление или таблица имели столбец, который используется Spark для параллельного чтения.
Выбор правильного столбца позволяет каждому исполнителю Spark читать только часть данных, хранящихся в указанном сегменте, избегая перемещения больших объемов данных между сегментами, что улучшает производительность чтения.
Использование gp_segment_id
По умолчанию DBReader будет использовать столбец gp_segment_id для параллельного чтения данных. Каждый раздел DataFrame будет содержать данные конкретного сегмента Greenplum.
Это позволяет каждому исполнителю Spark читать только данные из определенного сегмента Greenplum, избегая перемещения больших объемов данных между сегментами.
Если используется представление, рекомендуется включить столбец gp_segment_id в это представление:
Чтение из представления со столбцом gp_segment_id
from onetl.connection import Greenplum
from onetl.db import DBReader
greenplum = Greenplum(...)
greenplum.execute(
"""
CREATE VIEW schema.view_with_gp_segment_id AS
SELECT
id,
some_column,
another_column,
gp_segment_id -- ВАЖНО
FROM schema.some_table
""",
)
reader = DBReader(
connection=greenplum,
source="schema.view_with_gp_segment_id",
)
df = reader.run()
Использование пользовательского partition_column
Иногда в таблице или представлении отсутствует столбец gp_segment_id, но есть некоторый столбец с диапазоном значений, коррелирующим с распределением сегмента Greenplum.
В этом случае вместо него можно использовать пользовательский столбец:
Чтение из представления с пользовательским partition_column
from onetl.connection import Greenplum
from onetl.db import DBReader
greenplum = Greenplum(...)
greenplum.execute(
"""
CREATE VIEW schema.view_with_partition_column AS
SELECT
id,
some_column,
part_column -- коррелирует с ID сегмента greenplum
FROM schema.some_table
""",
)
reader = DBReader(
connection=greenplum,
source="schema.view_with_partition_column",
options=Greenplum.ReadOptions(
# параллельное чтение данных с использованием указанного столбца
partitionColumn="part_column",
# создание 10 задач Spark, каждая будет читать только часть данных таблицы
partitions=10,
),
)
df = reader.run()
Чтение таблиц DISTRIBUTED REPLICATED
Реплицированные таблицы вообще не имеют столбца gp_segment_id, поэтому вам нужно установить partition_column на имя некоторого столбца типа integer/bigint/smallint.
Параллельное выполнение JOIN
В случае использования представлений, которые требуют перемещения данных между сегментами Greenplum, например запросов JOIN, следует использовать другой подход.
Каждый экзекутор Spark из N будет выполнять один и тот же запрос, поэтому каждый из N запросов запустит свой собственный процесс JOIN, что приведет к очень высокой нагрузке на сегменты Greenplum.
Этого следует избегать.
Вместо этого рекомендуется выполнить запрос JOIN на стороне Greenplum, сохранить результат во временной таблице, а затем прочитать эту таблицу с помощью DBReader:
Чтение из представления с использованием промежуточной таблицы
from onetl.connection import Greenplum
from onetl.db import DBReader
greenplum = Greenplum(...)
greenplum.execute(
"""
CREATE UNLOGGED TABLE schema.intermediate_table AS
SELECT
id,
tbl1.col1,
tbl1.data,
tbl2.another_data
FROM
schema.table1 as tbl1
JOIN
schema.table2 as tbl2
ON
tbl1.col1 = tbl2.col2
WHERE ...
""",
)
reader = DBReader(
connection=greenplum,
source="schema.intermediate_table",
)
df = reader.run()
# запись dataframe куда-либо
greenplum.execute(
"""
DROP TABLE schema.intermediate_table
""",
)
Warning
НИКОГДА не делайте так:
df1 = DBReader(connection=greenplum, target="public.table1", ...).run()
df2 = DBReader(connection=greenplum, target="public.table2", ...).run()
joined_df = df1.join(df2, on="col")
Это приведет к отправке всех данных из обеих таблиц table1 и table2 в память исполнителя Spark, а затем JOIN будет выполнен на стороне Spark, а не внутри Greenplum. Это ОЧЕНЬ неэффективно.
Примечание о таблицах TEMPORARY
Кто-то может подумать, что запись данных из представления или результата JOIN во временную таблицу (TEMPORARY), а затем передача ее в DBReader, является эффективным способом чтения данных из Greenplum. Это потому, что временные таблицы не генерируют файлы WAL и автоматически удаляются после завершения транзакции.
Это НЕ сработает. Каждый экзекутор Spark устанавливает свое собственное соединение с Greenplum. И каждое соединение начинает свою собственную транзакцию, что означает, что каждый исполнитель будет читать пустую временную таблицу.
Вам следует использовать таблицы UNLOGGED для записи данных во временную таблицу без генерации журналов WAL.
Параметры
GreenplumReadOptions
onetl.connection.db_connection.greenplum.options.GreenplumReadOptions
Bases: JDBCOptions
VMware's Greenplum Spark connector reading options.
.. warning::
Some options, like ``url``, ``dbtable``, ``server.*``, ``pool.*``,
etc are populated from connection attributes, and cannot be overridden by the user in ``ReadOptions`` to avoid issues.
Examples
.. note ::
You can pass any value
`supported by connector <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.3/greenplum-connector-spark/read_from_gpdb.html>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!
The set of supported options depends on connector version.
.. code:: python
from onetl.connection import Greenplum
options = Greenplum.ReadOptions(
partitionColumn="reg_id",
partitions=10,
)
Source code in onetl/connection/db_connection/greenplum/options.py
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | |
num_partitions = Field(alias='partitions')
class-attribute
instance-attribute
Number of jobs created by Spark to read the table content in parallel.
See documentation for :obj:~partition_column for more details
.. warning::
By default connector uses number of segments in the Greenplum cluster.
You should not change this option, unless you know what you're doing
.. warning::
Both options :obj:`~partition_column` and :obj:`~num_partitions` should have a value,
or both should be ``None``
partition_column = Field(alias='partitionColumn')
class-attribute
instance-attribute
Column used to parallelize reading from a table.
.. warning::
You should not change this option, unless you know what you're doing.
It's preferable to use default values to read data parallel by number of segments in Greenplum cluster.
Possible values
-
None(default): Spark generates N jobs (where N == number of segments in Greenplum cluster), each job is reading only data from a specific segment (filtering data bygp_segment_idcolumn).This is very effective way to fetch the data from a cluster.
-
table column Allocate each executor a range of values from a specific column.
Spark generates for each executor an SQL query:
Executor 1:
.. code:: sql
SELECT ... FROM table WHERE (partition_column >= lowerBound OR partition_column IS NULL) AND partition_column < (lower_bound + stride)Executor 2:
.. code:: sql
SELECT ... FROM table WHERE partition_column >= (lower_bound + stride) AND partition_column < (lower_bound + 2 * stride)...
Executor N:
.. code:: sql
SELECT ... FROM table WHERE partition_column >= (lower_bound + (N-1) * stride) AND partition_column <= upper_boundWhere
stride=(upper_bound - lower_bound) / num_partitions,lower_bound=MIN(partition_column),upper_bound=MAX(partition_column)... note::
Column type must be numeric. Other types are not supported... note::
:obj:`~num_partitions` is used just to calculate the partition stride, **NOT** for filtering the rows in table. So all rows in the table will be returned (unlike *Incremental* :ref:`strategy`)... note::
All queries are executed in parallel. To execute them sequentially, use *Batch* :ref:`strategy`.
.. warning::
Both options :obj:`~partition_column` and :obj:`~num_partitions` should have a value,
or both should be ``None``
Examples
Read data in 10 parallel jobs by range of values in id_column column:
.. code:: python
Greenplum.ReadOptions(
partitionColumn="id_column",
partitions=10,
)