Skip to content

Предварительные требования

Совместимость версий

  • Версии сервера Greenplum:
  • Официально заявлено: 5.x, 6.x и 7.x (для которых требуется Greenplum.get_packages(package_version="2.3.0") или выше)
  • Фактически протестировано: 6.23, 7.0
  • Версии Spark: 3.2.x (Spark 3.3+ is not supported yet)
  • Версии Java: 8 - 11

Смотри официальную документацию.

Установите PySpark

Для использования коннектора Greenplum необходимо установить PySpark (или добавить его в sys.path) ДО создания экземпляра коннектора.

Смотри [инструкцию по установке][DBR-onetl-install-spark] для получения более подробной информации.

Скачайте пакет VMware

Для использования коннектора Greenplum необходимо загрузить файл .jar коннектора с веб-сайта VMware а затем передать его в сессию Spark.

Warning

Пожалуйста, обратите внимание на [совместимость версий Spark и Scala][DBR-onetl-install-spark-compatibility-matrix].

Warning

При использовании пакета версии 2.3.0/2.3.1 с Greenplum 6.x возникают проблемы: коннектор может открыть транзакцию с запросом SELECT * FROM table LIMIT 0, но не закрыть её, что приводит к взаимоблокировкам во время записи.

Существует несколько способов это сделать. Подробности см. в разделе [установка пакетов Java][DBR-onetl-install-spark-injecting-java-packages].

Note

Если вы загружаете пакет в частный репозиторий пакетов, используйте groupId=io.pivotal и artifactoryId=greenplum-spark_2.12

2.12 — это версия Scala), чтобы присвоить загруженному пакету корректное имя.

Interaction Spark ↔ Greenplum

Этот коннектор очень отличается от обычного коннектора Postgres.

Коннектор Postgres подключается напрямую к хосту Postgres через драйвер JDBC:

  • Драйвер Spark → хост Postgres (получение имен и типов столбцов запроса, создание целевой таблицы)
  • Исполнители Spark → хост Postgres (отправка/получение фактических данных)

Данные НИКОГДА не следует отправлять через главный сервер Greenplum (координатор) с использованием обычного коннектора Postgres, так как очень легко перегрузить координатор, отправив сотни и тысячи гигабайт данных.

Вместо этого коннектор Greenplum использует протокол gpfdist с несколько сложной схемой:

  • Драйвер Spark → Мастер Greenplum (получение имен и типов столбцов запроса, создание целевой таблицы)
  • Исполнители Spark → Мастер Greenplum (создание ВНЕШНИХ ТАБЛИЦ)
  • Сегменты Greenplum → Исполнители Spark (Отправка/получение фактических данных через ВНЕШНЮЮ ТАБЛИЦУ)

Более подробную информацию можно найти в официальной документации.

Настройка коннектора

Каждый исполнитель Spark запускает сервер gpfdist, и каждый сегмент Greenplum подключается к этому серверу. Сегмент Greenplum должен знать IP-адрес/имя хоста сервера и номер порта.

Этот целевой IP-адрес и диапазон портов должны быть добавлены в правило брандмауэра ALLOW на хосте/кластере Spark с исходным IP-адресом = сеть Greenplum. В противном случае соединение не может быть установлено.

Более подробную информацию можно найти в официальной документации:

spark.master=local

Установка хоста сервера gpfdist

По умолчанию коннектор Greenplum пытается определить IP-адрес текущего хоста, а затем передать его сегменту Greenplum. На некоторых хостах это работает как есть, без дополнительной конфигурации. На других — нет.

Наиболее распространенная ошибка заключается в том, что сегмент Greenplum получает IP-адрес 127.0.0.1 (интерфейс обратной связи). Обычно это вызвано содержимым /etc/hosts следующего вида:

127.0.0.1 localhost real-host-name
$ hostname -f
localhost

$ hostname -i
127.0.0.1

При попытке чтения/записи данных в Greenplum возникнет следующая ошибка:

org.postgresql.util.PSQLException: ERROR: connection with gpfdist failed for
"gpfdist://127.0.0.1:49152/local-1709739764667/exec/driver",
effective url: "http://127.0.0.1:49152/local-1709739764667/exec/driver":
error code = 111 (Connection refused);  (seg3 slice1 12.34.56.78:10003 pid=123456)

Существует два способа исправления этого:

  • Передайте IP-адрес вашего хоста коннектору явно, например, так:
    import os
    
    # host IP, accessible from GP segments
    os.environ["SPARK_LOCAL_IP"] = "192.168.1.1"
    
    # !!!SET IP BEFORE CREATING SPARK SESSION!!!
    spark = ...
    
    greenplum = Greenplum(
        ...,
        extra={
            # connector will read IP from this environment variable
            "server.hostEnv": "env.SPARK_LOCAL_IP",
        },
        spark=spark,
    )
    

Более подробная информация в официальной документации.

  • Обновите файл /etc/hosts, указав в нем реальный IP-адрес хоста:

    127.0.0.1 localhost
    # this IP should be accessible from GP segments
    192.168.1.1 real-host-name
    

Для этого требуются права root на хосте, не каждый может это сделать. Кроме того, это не работает с динамическими IP-адресами.

Установите порт сервера gpfdist.

По умолчанию исполнители Spark могут запускать сервер gpfdist на любом случайном номере порта. Вы можете ограничить диапазон портов с помощью опции extra:

greenplum = Greenplum(
    ...,
    extra={
        "server.port": "41000-42000",  # !!! JUST AN EXAMPLE !!!
    },
)

Количество портов в этом диапазоне должно быть как минимум равно «количество параллельно запущенных сессий Spark на хосте» * «количество исполнителей на сессию».

spark.master=yarn

Установите хост сервера gpfdist

По умолчанию коннектор Greenplum пытается определить текущий IP-адрес хоста, а затем передать его сегменту Greenplum. Обычно с этим проблем не возникает, коннектор просто работает как есть, без каких-либо настроек.

Наиболее распространенная ошибка заключается в том, что сегмент Greenplum получает IP-адрес 127.0.0.1 (интерфейс обратной связи) вместо внешнего IP-адреса узла данных/вычислений Hadoop. Есть 3 способа это исправить:

  • Передайте сегменту Greenplum имя хоста узла вместо IP-адреса:

    greenplum = Greenplum(
        ...,
        extra={
            "server.useHostname": "true",
        },
    )
    

Для этого может потребоваться настройка DNS на каждом сегменте Greenplum для корректного разрешения имени хоста узла Hadoop в зависимости от IP-адреса.

Более подробная информация в официальной документации.

  • Укажите имя сетевого интерфейса, с которого будет получен IP-адрес:

    greenplum = Greenplum(
        ...,
        extra={
            "server.nic": "eth0",
        },
    )
    

Список сетевых интерфейсов можно получить с помощью этой команды.

Note

Эту команду следует выполнять на узле кластера Hadoop, а не на хосте драйвера Spark!

$ ip address
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
    inet 127.0.0.1/8 scope host lo
    valid_lft forever preferred_lft forever
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
    inet 192.168.1.1/24 brd 192.168.1.255 scope global dynamic noprefixroute eth0
    valid_lft 83457sec preferred_lft 83457sec

Обратите внимание, что в этом случае каждый узел кластера Hadoop должен иметь сетевой интерфейс с именем eth0, что может быть не так.

Больше информации в официальной документации.

  • Обновите файл /etc/hosts на каждом узле кластера Hadoop, добавив его IP-адрес:

    127.0.0.1 localhost
    # this IP should be accessible from GP segments
    192.168.1.1 real-host-name
    

Для этого требуются права root на хосте, не каждый может это сделать. Кроме того, это не работает с динамическими IP-адресами.

Установите порт сервера gpfdist.

По умолчанию исполнители Spark могут запускать сервер gpfdist на любом случайном номере порта. Вы можете ограничить диапазон портов с помощью опции extra:

greenplum = Greenplum(
    ...,
    extra={
        "server.port": "41000-42000",  # !!! JUST AN EXAMPLE !!!
    },
)

Количество портов в этом диапазоне должно быть не менее, равно: количество параллельно запущенных сессий Spark на узел * количество исполнителей на сессию / количество узлов Hadoop.

spark.master=k8s

Перед запуском сессии Spark необходимо создать объект Kubernetes Ingress:

ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
    name: gpfdist-ingress
    namespace: mynamespace
    annotations:
        nginx.ingress.kubernetes.io/ssl-redirect: "false"
        nginx.ingress.kubernetes.io/force-ssl-redirect: "false"
spec:
    rules:
    - http:
        paths:
        - path: /
          pathType: Prefix
          backend:
            service:
                name: gpfdist-default
                port:
                number: 50000

## Возвращено из ответа API K8S ##
# status:
#     loadBalancer:
#         ingress:
#             - ip: 11.22.33.44

Затем добавьте специальный слушатель Spark в конфигурацию сессии Spark и укажите IP-адрес или доменное имя балансировщика нагрузки ingress с номером порта:

spark = (
    SparkSession.builder.config("spark.master", "k8s://...")
    .config("spark.extraListeners", "org.greenplum.GpfdistIngressListener")
    .config("spark.kubernetes.namespace", "mynamespace")
    .config("spark.greenplum.k8s.ingress.name", "gpfdist-ingress")  # ingress name
    .config("spark.greenplum.gpfdist.host", "11.22.33.44")  # ingress IP/domain name
    .config("spark.greenplum.gpfdist.listen-port", "50000")  # ingress port
    .config(
        "spark.greenplum.gpfdist.is-ssl", "false"
    )  # true for ingress with TLS enabled
).getOrCreate()

Установите фиксированный порт для прослушивания сервером gpfdist:

greenplum = Greenplum(
    ...,
    extra={
        "server.port": "50000",  # should match ingress port
    },
)

Установка количества соединений

Warning

Это очень важно!!!

Если вы не ограничите количество соединений, вы можете превысить лимит max_connections, установленный на стороне Greenplum. Обычно он не так высок, например, максимум 500-1000 соединений, в зависимости от настроек вашего экземпляра Greenplum и использования балансировщиков соединений, таких как pgbouncer.

Использование всех доступных соединений означает, что никто (даже администраторы) не сможет подключиться к Greenplum!

Каждая задача, запущенная на исполнителе Spark, устанавливает собственное соединение с главным узлом Greenplum. Чтобы избежать открытия слишком большого количества соединений с главным узлом Greenplum (координатором), следует ограничить количество задач.

  • Для чтения данных объемом 5-10 ГБ требуется около 3-5 параллельных соединений.
  • Для чтения данных объемом 20-30 ГБ требуется около 5-10 параллельных соединений.
  • Для чтения данных объемом 50 ГБ требуется примерно 10-20 параллельных соединений.
  • Для чтения данных объемом более 100 ГБ требуется 20-30 параллельных соединений.
  • Открытие более 30-50 соединений не рекомендуется.

Максимальное количество параллельных задач составляет N исполнителей * N ядер на исполнитель, поэтому это значение можно настроить с помощью конфигурации сессии Spark:

spark = (
    SparkSession.builder
    # Spark will run with 5 threads in local mode, allowing up to 5 parallel tasks
    .config("spark.master", "local[5]")
).getOrCreate()

# Set connection pool size AT LEAST to number of executors + 1 for driver
Greenplum(
    ...,
    extra={
        "pool.maxSize": 6,  # 5 executors + 1 driver
    },
)
spark = (
    SparkSession.builder
    .config("spark.master", "yarn")
    # Spark will start MAX 10 executors with 1 core each (dynamically), so max number of parallel jobs is 10
    .config("spark.dynamicAllocation.maxExecutors", 10)
    .config("spark.executor.cores", 1)
).getOrCreate()
spark = (
    SparkSession.builder
    .config("spark.master", "yarn")
    # Spark will start EXACTLY 10 executors with 1 core each, so max number of parallel jobs is 10
    .config("spark.executor.instances", 10)
    .config("spark.executor.cores", 1)
).getOrCreate()

Смотри документацию по объединению соединений.

Боковые регулировки Greenplum

Разрешить подключение к главному серверу Greenplum

Попросите администратора кластера Greenplum разрешить вашему пользователю подключаться к главному серверу (координатору) Greenplum, например, обновив файл pg_hba.conf.

Более подробную информацию можно найти в официальной документации.

Предоставить необходимые гранты

Попросите администратора кластера Greenplum установить следующие права доступа для пользователя:

-- get access to get tables metadata & cluster information
GRANT SELECT ON information_schema.tables TO username;
GRANT SELECT ON pg_attribute TO username;
GRANT SELECT ON pg_class TO username;
GRANT SELECT ON pg_namespace TO username;
GRANT SELECT ON pg_settings TO username;
GRANT SELECT ON pg_stats TO username;
GRANT SELECT ON gp_distributed_xacts TO username;
GRANT SELECT ON gp_segment_configuration TO username;
-- Greenplum 5.x only
GRANT SELECT ON gp_distribution_policy TO username;

-- allow creating external tables in the same schema as source/target table
GRANT USAGE ON SCHEMA myschema TO username;
GRANT CREATE ON SCHEMA myschema TO username;
ALTER USER username CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist') CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');

-- allow read access to specific table (to get column types)
-- allow write access to specific table
GRANT SELECT, INSERT ON myschema.mytable TO username;
-- get access to get tables metadata & cluster information
GRANT SELECT ON information_schema.tables TO username;
GRANT SELECT ON pg_attribute TO username;
GRANT SELECT ON pg_class TO username;
GRANT SELECT ON pg_namespace TO username;
GRANT SELECT ON pg_settings TO username;
GRANT SELECT ON pg_stats TO username;
GRANT SELECT ON gp_distributed_xacts TO username;
GRANT SELECT ON gp_segment_configuration TO username;
-- Greenplum 5.x only
GRANT SELECT ON gp_distribution_policy TO username;

-- allow creating external tables in the same schema as source table
GRANT USAGE ON SCHEMA schema_to_read TO username;
GRANT CREATE ON SCHEMA schema_to_read TO username;
-- yes, `writable` for reading from GP, because data is written from Greenplum to Spark executor.
ALTER USER username CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');

-- allow read access to specific table
GRANT SELECT ON schema_to_read.table_to_read TO username;

Более подробную информацию можно найти в официальной документации.