Предварительные требования
Совместимость версий
- Версии сервера Greenplum:
- Официально заявлено: 5.x, 6.x и 7.x (для которых требуется
Greenplum.get_packages(package_version="2.3.0")или выше) - Фактически протестировано: 6.23, 7.0
- Версии Spark: 3.2.x (Spark 3.3+ is not supported yet)
- Версии Java: 8 - 11
Смотри официальную документацию.
Установите PySpark
Для использования коннектора Greenplum необходимо установить PySpark (или добавить его в sys.path)
ДО создания экземпляра коннектора.
Смотри [инструкцию по установке][DBR-onetl-install-spark] для получения более подробной информации.
Скачайте пакет VMware
Для использования коннектора Greenplum необходимо загрузить файл .jar коннектора с
веб-сайта VMware
а затем передать его в сессию Spark.
Warning
Пожалуйста, обратите внимание на [совместимость версий Spark и Scala][DBR-onetl-install-spark-compatibility-matrix].
Warning
При использовании пакета версии 2.3.0/2.3.1 с Greenplum 6.x возникают проблемы: коннектор может
открыть транзакцию с запросом SELECT * FROM table LIMIT 0, но не закрыть её, что приводит к взаимоблокировкам
во время записи.
Существует несколько способов это сделать. Подробности см. в разделе [установка пакетов Java][DBR-onetl-install-spark-injecting-java-packages].
Note
Если вы загружаете пакет в частный репозиторий пакетов, используйте groupId=io.pivotal и artifactoryId=greenplum-spark_2.12
2.12 — это версия Scala), чтобы присвоить загруженному пакету корректное имя.
Interaction Spark ↔ Greenplum
Этот коннектор очень отличается от обычного коннектора Postgres.
Коннектор Postgres подключается напрямую к хосту Postgres через драйвер JDBC:
- Драйвер Spark → хост Postgres (получение имен и типов столбцов запроса, создание целевой таблицы)
- Исполнители Spark → хост Postgres (отправка/получение фактических данных)
Данные НИКОГДА не следует отправлять через главный сервер Greenplum (координатор) с использованием обычного коннектора Postgres, так как очень легко перегрузить координатор, отправив сотни и тысячи гигабайт данных.
Вместо этого коннектор Greenplum использует протокол gpfdist с несколько сложной схемой:
- Драйвер Spark → Мастер Greenplum (получение имен и типов столбцов запроса, создание целевой таблицы)
- Исполнители Spark → Мастер Greenplum (создание ВНЕШНИХ ТАБЛИЦ)
- Сегменты Greenplum → Исполнители Spark (Отправка/получение фактических данных через
ВНЕШНЮЮ ТАБЛИЦУ)
Более подробную информацию можно найти в официальной документации.
Настройка коннектора
Каждый исполнитель Spark запускает сервер gpfdist, и каждый сегмент Greenplum подключается к этому серверу.
Сегмент Greenplum должен знать IP-адрес/имя хоста сервера и номер порта.
Этот целевой IP-адрес и диапазон портов должны быть добавлены в правило брандмауэра ALLOW на хосте/кластере Spark с исходным IP-адресом = сеть Greenplum.
В противном случае соединение не может быть установлено.
Более подробную информацию можно найти в официальной документации:
spark.master=local
Установка хоста сервера gpfdist
По умолчанию коннектор Greenplum пытается определить IP-адрес текущего хоста, а затем передать его сегменту Greenplum. На некоторых хостах это работает как есть, без дополнительной конфигурации. На других — нет.
Наиболее распространенная ошибка заключается в том, что сегмент Greenplum получает IP-адрес 127.0.0.1 (интерфейс обратной связи).
Обычно это вызвано содержимым /etc/hosts следующего вида:
127.0.0.1 localhost real-host-name
$ hostname -f
localhost
$ hostname -i
127.0.0.1
При попытке чтения/записи данных в Greenplum возникнет следующая ошибка:
org.postgresql.util.PSQLException: ERROR: connection with gpfdist failed for
"gpfdist://127.0.0.1:49152/local-1709739764667/exec/driver",
effective url: "http://127.0.0.1:49152/local-1709739764667/exec/driver":
error code = 111 (Connection refused); (seg3 slice1 12.34.56.78:10003 pid=123456)
Существует два способа исправления этого:
- Передайте IP-адрес вашего хоста коннектору явно, например, так:
import os # host IP, accessible from GP segments os.environ["SPARK_LOCAL_IP"] = "192.168.1.1" # !!!SET IP BEFORE CREATING SPARK SESSION!!! spark = ... greenplum = Greenplum( ..., extra={ # connector will read IP from this environment variable "server.hostEnv": "env.SPARK_LOCAL_IP", }, spark=spark, )
Более подробная информация в официальной документации.
-
Обновите файл
/etc/hosts, указав в нем реальный IP-адрес хоста:127.0.0.1 localhost # this IP should be accessible from GP segments 192.168.1.1 real-host-name
Для этого требуются права root на хосте, не каждый может это сделать. Кроме того, это не работает с динамическими IP-адресами.
Установите порт сервера gpfdist.
По умолчанию исполнители Spark могут запускать сервер gpfdist на любом случайном номере порта.
Вы можете ограничить диапазон портов с помощью опции extra:
greenplum = Greenplum(
...,
extra={
"server.port": "41000-42000", # !!! JUST AN EXAMPLE !!!
},
)
Количество портов в этом диапазоне должно быть как минимум равно «количество параллельно запущенных сессий Spark на хосте» * «количество исполнителей на сессию».
spark.master=yarn
Установите хост сервера gpfdist
По умолчанию коннектор Greenplum пытается определить текущий IP-адрес хоста, а затем передать его сегменту Greenplum. Обычно с этим проблем не возникает, коннектор просто работает как есть, без каких-либо настроек.
Наиболее распространенная ошибка заключается в том, что сегмент Greenplum получает IP-адрес 127.0.0.1 (интерфейс обратной связи)
вместо внешнего IP-адреса узла данных/вычислений Hadoop. Есть 3 способа это исправить:
-
Передайте сегменту Greenplum имя хоста узла вместо IP-адреса:
greenplum = Greenplum( ..., extra={ "server.useHostname": "true", }, )
Для этого может потребоваться настройка DNS на каждом сегменте Greenplum для корректного разрешения имени хоста узла Hadoop в зависимости от IP-адреса.
Более подробная информация в официальной документации.
-
Укажите имя сетевого интерфейса, с которого будет получен IP-адрес:
greenplum = Greenplum( ..., extra={ "server.nic": "eth0", }, )
Список сетевых интерфейсов можно получить с помощью этой команды.
Note
Эту команду следует выполнять на узле кластера Hadoop, а не на хосте драйвера Spark!
$ ip address
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
inet 192.168.1.1/24 brd 192.168.1.255 scope global dynamic noprefixroute eth0
valid_lft 83457sec preferred_lft 83457sec
Обратите внимание, что в этом случае каждый узел кластера Hadoop должен иметь сетевой интерфейс с именем eth0,
что может быть не так.
Больше информации в официальной документации.
-
Обновите файл
/etc/hostsна каждом узле кластера Hadoop, добавив его IP-адрес:127.0.0.1 localhost # this IP should be accessible from GP segments 192.168.1.1 real-host-name
Для этого требуются права root на хосте, не каждый может это сделать. Кроме того, это не работает с динамическими IP-адресами.
Установите порт сервера gpfdist.
По умолчанию исполнители Spark могут запускать сервер gpfdist на любом случайном номере порта.
Вы можете ограничить диапазон портов с помощью опции extra:
greenplum = Greenplum(
...,
extra={
"server.port": "41000-42000", # !!! JUST AN EXAMPLE !!!
},
)
Количество портов в этом диапазоне должно быть не менее, равно: количество параллельно запущенных сессий Spark на узел * количество исполнителей на сессию / количество узлов Hadoop.
spark.master=k8s
Перед запуском сессии Spark необходимо создать объект Kubernetes Ingress:
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: gpfdist-ingress
namespace: mynamespace
annotations:
nginx.ingress.kubernetes.io/ssl-redirect: "false"
nginx.ingress.kubernetes.io/force-ssl-redirect: "false"
spec:
rules:
- http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: gpfdist-default
port:
number: 50000
## Возвращено из ответа API K8S ##
# status:
# loadBalancer:
# ingress:
# - ip: 11.22.33.44
Затем добавьте специальный слушатель Spark в конфигурацию сессии Spark и укажите IP-адрес или доменное имя балансировщика нагрузки ingress с номером порта:
spark = (
SparkSession.builder.config("spark.master", "k8s://...")
.config("spark.extraListeners", "org.greenplum.GpfdistIngressListener")
.config("spark.kubernetes.namespace", "mynamespace")
.config("spark.greenplum.k8s.ingress.name", "gpfdist-ingress") # ingress name
.config("spark.greenplum.gpfdist.host", "11.22.33.44") # ingress IP/domain name
.config("spark.greenplum.gpfdist.listen-port", "50000") # ingress port
.config(
"spark.greenplum.gpfdist.is-ssl", "false"
) # true for ingress with TLS enabled
).getOrCreate()
Установите фиксированный порт для прослушивания сервером gpfdist:
greenplum = Greenplum(
...,
extra={
"server.port": "50000", # should match ingress port
},
)
Установка количества соединений
Warning
Это очень важно!!!
Если вы не ограничите количество соединений, вы можете превысить лимит max_connections, установленный на стороне Greenplum. Обычно он не так высок, например, максимум 500-1000 соединений,
в зависимости от настроек вашего экземпляра Greenplum и использования балансировщиков соединений, таких как pgbouncer.
Использование всех доступных соединений означает, что никто (даже администраторы) не сможет подключиться к Greenplum!
Каждая задача, запущенная на исполнителе Spark, устанавливает собственное соединение с главным узлом Greenplum. Чтобы избежать открытия слишком большого количества соединений с главным узлом Greenplum (координатором), следует ограничить количество задач.
- Для чтения данных объемом 5-10 ГБ требуется около 3-5 параллельных соединений.
- Для чтения данных объемом 20-30 ГБ требуется около 5-10 параллельных соединений.
- Для чтения данных объемом 50 ГБ требуется примерно 10-20 параллельных соединений.
- Для чтения данных объемом более 100 ГБ требуется 20-30 параллельных соединений.
- Открытие более 30-50 соединений не рекомендуется.
Максимальное количество параллельных задач составляет N исполнителей * N ядер на исполнитель, поэтому это значение можно настроить с помощью конфигурации сессии Spark:
spark = (
SparkSession.builder
# Spark will run with 5 threads in local mode, allowing up to 5 parallel tasks
.config("spark.master", "local[5]")
).getOrCreate()
# Set connection pool size AT LEAST to number of executors + 1 for driver
Greenplum(
...,
extra={
"pool.maxSize": 6, # 5 executors + 1 driver
},
)
spark = (
SparkSession.builder
.config("spark.master", "yarn")
# Spark will start MAX 10 executors with 1 core each (dynamically), so max number of parallel jobs is 10
.config("spark.dynamicAllocation.maxExecutors", 10)
.config("spark.executor.cores", 1)
).getOrCreate()
spark = (
SparkSession.builder
.config("spark.master", "yarn")
# Spark will start EXACTLY 10 executors with 1 core each, so max number of parallel jobs is 10
.config("spark.executor.instances", 10)
.config("spark.executor.cores", 1)
).getOrCreate()
Смотри документацию по объединению соединений.
Боковые регулировки Greenplum
Разрешить подключение к главному серверу Greenplum
Попросите администратора кластера Greenplum разрешить вашему пользователю подключаться к главному серверу (координатору) Greenplum,
например, обновив файл pg_hba.conf.
Более подробную информацию можно найти в официальной документации.
Предоставить необходимые гранты
Попросите администратора кластера Greenplum установить следующие права доступа для пользователя:
-- get access to get tables metadata & cluster information
GRANT SELECT ON information_schema.tables TO username;
GRANT SELECT ON pg_attribute TO username;
GRANT SELECT ON pg_class TO username;
GRANT SELECT ON pg_namespace TO username;
GRANT SELECT ON pg_settings TO username;
GRANT SELECT ON pg_stats TO username;
GRANT SELECT ON gp_distributed_xacts TO username;
GRANT SELECT ON gp_segment_configuration TO username;
-- Greenplum 5.x only
GRANT SELECT ON gp_distribution_policy TO username;
-- allow creating external tables in the same schema as source/target table
GRANT USAGE ON SCHEMA myschema TO username;
GRANT CREATE ON SCHEMA myschema TO username;
ALTER USER username CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist') CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');
-- allow read access to specific table (to get column types)
-- allow write access to specific table
GRANT SELECT, INSERT ON myschema.mytable TO username;
-- get access to get tables metadata & cluster information
GRANT SELECT ON information_schema.tables TO username;
GRANT SELECT ON pg_attribute TO username;
GRANT SELECT ON pg_class TO username;
GRANT SELECT ON pg_namespace TO username;
GRANT SELECT ON pg_settings TO username;
GRANT SELECT ON pg_stats TO username;
GRANT SELECT ON gp_distributed_xacts TO username;
GRANT SELECT ON gp_segment_configuration TO username;
-- Greenplum 5.x only
GRANT SELECT ON gp_distribution_policy TO username;
-- allow creating external tables in the same schema as source table
GRANT USAGE ON SCHEMA schema_to_read TO username;
GRANT CREATE ON SCHEMA schema_to_read TO username;
-- yes, `writable` for reading from GP, because data is written from Greenplum to Spark executor.
ALTER USER username CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');
-- allow read access to specific table
GRANT SELECT ON schema_to_read.table_to_read TO username;
Более подробную информацию можно найти в официальной документации.