Соответствие типов Greenplum <-> Spark
Note
Результаты ниже действительны для Spark 3.2.4 и могут отличаться в других версиях Spark.
Определение типов и приведение
DataFrame в Spark всегда имеют schema, которая представляет собой список столбцов с соответствующими типами Spark. Все операции над столбцом выполняются с использованием типа столбца.
Чтение из Greenplum
Коннектор Greenplum выполняет следующие действия:
- Выполняет запрос
SELECT * FROM table LIMIT 0[^1]. - Для каждого столбца в результате запроса получает имя столбца и тип Greenplum.
- Находит соответствующую комбинацию
Тип Greenplum (чтение)→Тип Spark(см. ниже) для каждого столбца DataFrame. Если комбинация не найдена, выбрасывает исключение. - Использует функции проекции столбцов Spark и предикаты pushdown для построения финального запроса.
- Создает DataFrame из сгенерированного запроса с выведенной схемой.
[^1]: Да, все столбцы таблицы, а не только выбранные. Это означает, что если исходная таблица содержит столбцы с неподдерживаемым типом, всю таблицу невозможно прочитать.
Запись в существующую таблицу Greenplum
Коннектор Greenplum выполняет следующие действия:
- Получает имена столбцов в DataFrame.
- Выполняет запрос
SELECT * FROM table LIMIT 0. - Для каждого столбца в результате запроса получает имя столбца и тип Greenplum.
- Сопоставляет столбцы таблицы со столбцами DataFrame (по имени, без учета регистра).
Если какой-то столбец присутствует только в целевой таблице, но не в DataFrame (например, столбцы с
DEFAULTилиSERIAL), и наоборот, выбрасывает исключение. См. Явное приведение типов. - Находит соответствующую комбинацию
Тип Spark→Тип Greenplum (запись)(см. ниже) для каждого столбца DataFrame. Если комбинация не найдена, выбрасывает исключение. - Если
Тип Greenplum (запись)совпадает сТип Greenplum (чтение), дополнительные преобразования не выполняются, столбец DataFrame записывается в Greenplum как есть. - Если
Тип Greenplum (запись)не совпадает сТип Greenplum (чтение), столбец DataFrame будет приведен к типу целевого столбца на стороне Greenplum. Например, вы можете записать столбец с текстовыми данными в столбецjson, который коннектор Greenplum в настоящее время не поддерживает.
Создание новой таблицы с помощью Spark
Warning
КАТЕГОРИЧЕСКИ НЕ РЕКОМЕНДУЕТСЯ!
Коннектор Greenplum выполняет следующие действия:
- Находит соответствующую комбинацию
Тип Spark→Тип Greenplum (создание)(см. ниже) для каждого столбца DataFrame. Если комбинация не найдена, выбрасывает исключение. - Генерирует DDL для создания таблицы в Greenplum, например
CREATE TABLE (col1 ...), и выполняет его. - Записывает DataFrame в созданную таблицу как есть.
Более подробную информацию можно найти здесь.
Но коннектор Greenplum поддерживает только ограниченное количество типов и почти не поддерживает пользовательские выражения (такие как PARTITION BY).
Поэтому вместо того, чтобы полагаться на Spark для создания таблиц:
Посмотреть пример
writer = DBWriter(
connection=greenplum,
target="public.table",
options=Greenplum.WriteOptions(
if_exists="append",
# по умолчанию распределение случайное
distributedBy="id",
# partitionBy не поддерживается
),
)
writer.run(df)
Всегда предпочитайте создавать таблицу с нужным DDL ПЕРЕД ЗАПИСЬЮ ДАННЫХ:
Посмотреть пример
greenplum.execute(
"""
CREATE TABLE public.table (
id int32,
business_dt timestamp(6),
value json
)
PARTITION BY RANGE (business_dt)
DISTRIBUTED BY id
""",
)
writer = DBWriter(
connection=greenplum,
target="public.table",
options=Greenplum.WriteOptions(if_exists="append"),
)
writer.run(df)
См. документацию Greenplum CREATE TABLE.
Поддерживаемые типы
См.:
Числовые типы
| Тип Greenplum (чтение) | Тип Spark | Тип Greenplum (запись) | Тип Greenplum (создание) |
|---|---|---|---|
decimaldecimal(P=0..38)decimal(P=0..38, S=0..38) |
DecimalType(P=38, S=18)DecimalType(P=0..38, S=0)DecimalType(P=0..38, S=0..38) |
decimal(P=38, S=18)decimal(P=0..38, S=0)decimal(P=0..38, S=0..38) |
decimal (неограниченный) |
decimal(P=39.., S=0..) |
не поддерживается [^2] | ||
real |
FloatType() |
real |
real |
double precision |
DoubleType() |
double precision |
double precision |
- |
ByteType() |
не поддерживается | не поддерживается |
smallint |
ShortType() |
smallint |
smallint |
integer |
IntegerType() |
integer |
integer |
bigint |
LongType() |
bigint |
bigint |
moneyint4rangeint8rangenumrangeint2vector |
не поддерживается |
[^2]: Greenplum поддерживает десятичные типы с неограниченной точностью.
Но `DecimalType(P, S)` в Spark поддерживает максимум `P=38` (128 бит). Невозможно читать, записывать или оперировать значениями большей точности, это приводит к исключению.
Временные типы
| Тип Greenplum (чтение) | Тип Spark | Тип Greenplum (запись) | Тип Greenplum (создание) |
|---|---|---|---|
date |
DateType() |
date |
date |
timetime(0..6)time with time zonetime(0..6) with time zone |
TimestampType(), особенности форматирования времени [^3] |
timestamp |
timestamp |
timestamptimestamp(0..6)timestamp with time zonetimestamp(0..6) with time zone |
TimestampType() |
timestamp |
timestamp |
interval или любой точностиdaterangetsrangetstzrange |
не поддерживается |
Warning
Обратите внимание, что типы в Greenplum и Spark имеют разные диапазоны значений:
| Тип Greenplum | Минимальное значение | Максимальное значение | Тип Spark | Минимальное значение | Максимальное значение |
|---|---|---|---|---|---|
date |
-4713-01-01 |
5874897-01-01 |
DateType() |
0001-01-01 |
9999-12-31 |
timestamptime |
-4713-01-01 00:00:00.00000000:00:00.000000 |
294276-12-31 23:59:59.99999924:00:00.000000 |
TimestampType() |
0001-01-01 00:00:00.000000 |
9999-12-31 23:59:59.999999 |
Таким образом, не все значения могут быть считаны из Greenplum в Spark.
Ссылки:
[^3]: Тип time аналогичен timestamp с датой 1970-01-01. Поэтому вместо чтения данных из Postgres как 23:59:59
фактически читается 1970-01-01 23:59:59, и наоборот.
Строковые типы
| Тип Greenplum (чтение) | Тип Spark | Тип Greenplum (запись) | Тип Greenplum (создание) |
|---|---|---|---|
charactercharacter(N)character varyingcharacter varying(N)textxmlCREATE TYPE ... AS ENUM |
StringType() |
text |
text |
jsonjsonb |
не поддерживается |
Бинарные типы
| Тип Greenplum (чтение) | Тип Spark | Тип Greenplum (запись) | Тип Greenplum (создание) |
|---|---|---|---|
boolean |
BooleanType() |
boolean |
boolean |
bitbit(N)bit varyingbit varying(N) |
не поддерживается |
||
bytea |
не поддерживается [^4] | ||
- |
BinaryType() |
bytea |
bytea |
[^4]: Да, это странно.
Структурные типы
| Тип Greenplum (чтение) | Тип Spark | Тип Greenplum (запись) | Тип Greenplum (создание) |
|---|---|---|---|
T[] |
не поддерживается | ||
- |
ArrayType() |
не поддерживается | |
CREATE TYPE sometype (...) |
StringType() |
text |
text |
- |
StructType()MapType() |
не поддерживается |
Неподдерживаемые типы
Столбцы этих типов не могут быть прочитаны/записаны в Spark:
cidrinetmacaddrmacaddr8circleboxlinelsegpathpointpolygontsvectortsqueryuuid
Есть способ избежать этого - просто привести неподдерживаемые типы к text. Но способ, которым это можно сделать, не так прост.
Явное приведение типов
DBReader
Прямое приведение типов Greenplum не поддерживается DBReader из-за особенностей реализации коннектора.
reader = DBReader(
connection=greenplum,
# вызовет ошибку
columns=["CAST(unsupported_column AS text)"],
)
Но есть обходной путь - создать представление (view) с приведением неподдерживаемого столбца к тексту (или любому другому поддерживаемому типу).
Например, вы можете использовать функцию Postgres to_json для преобразования столбца любого типа в строковое представление, а затем анализировать этот столбец на стороне Spark с помощью метода JSON.parse_column.
from pyspark.sql.types import ArrayType, IntegerType
from onetl.connection import Greenplum
from onetl.db import DBReader
from onetl.file.format import JSON
greenplum = Greenplum(...)
greenplum.execute(
"""
CREATE VIEW schema.view_with_json_column AS
SELECT
id,
supported_column,
to_json(array_column) array_column_as_json,
gp_segment_id -- ! важно !
FROM
schema.table_with_unsupported_columns
""",
)
# создаем dataframe используя это представление
reader = DBReader(
connection=greenplum,
source="schema.view_with_json_column",
)
df = reader.run()
# Определяем схему для JSON-данных
json_scheme = ArrayType(IntegerType())
df = df.select(
df.id,
df.supported_column,
JSON().parse_column(df.array_column_as_json, json_scheme).alias("array_column"),
)
DBWriter
Для записи данных в столбец text или json в таблице Greenplum используйте метод JSON.serialize_column.
from onetl.connection import Greenplum
from onetl.db import DBWriter
from onetl.file.format import JSON
greenplum = Greenplum(...)
greenplum.execute(
"""
CREATE TABLE schema.target_table (
id int,
supported_column timestamp,
array_column_as_json jsonb, -- или text
)
DISTRIBUTED BY id
""",
)
write_df = df.select(
df.id,
df.supported_column,
JSON().serialize_column(df.array_column).alias("array_column_json"),
)
writer = DBWriter(
connection=greenplum,
target="schema.target_table",
)
writer.run(write_df)
Затем вы можете анализировать этот столбец на стороне Greenplum:
SELECT
id,
supported_column,
-- доступ к первому элементу массива
array_column_as_json->0
FROM
schema.target_table