Skip to content

Настройка Iceberg Catalog

Обзор Iceberg Catalog

Apache Iceberg Catalog предоставляет единую точку доступа к табличным данным в формате Iceberg, хранящимся в S3-совместимом объектном хранилище. В нашей платформе используется REST-реализация каталога.

Компоненты Iceberg Catalog

  • REST Catalog Service: proxy-adapter-rest-fixture.mwsdata.svc.cluster.local:8000
  • Хранилище: S3-совместимое объектное хранилище (Ozone)
  • Аутентификация: OAuth2 через Keycloak
  • Метаданные: Управление схемами, таблицами и версиями

Настройка Iceberg Catalog в Spark

Базовая конфигурация

import os
import socket
from pyspark.sql import SparkSession

# Базовые настройки
platform_namespace = "mwsdata"  # текущий неймспейс платформы
catalog_name = "wh"  # имя каталога

configuration = {
    "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.defaultCatalog": catalog_name,
    "spark.ssl.enabled": "false",

    # Настройки Iceberg REST Catalog
    f"spark.sql.catalog.{catalog_name}": "org.apache.iceberg.spark.SparkCatalog",
    f"spark.sql.catalog.{catalog_name}.type": "rest",
    f"spark.sql.catalog.{catalog_name}.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    f"spark.sql.catalog.{catalog_name}.uri": f"http://proxy-adapter-rest-fixture.{platform_namespace}.svc.cluster.local:8000",

    # Настройки S3
    f"spark.sql.catalog.{catalog_name}.s3.endpoint": "http://s3balancer.company.local:9878",
    f"spark.sql.catalog.{catalog_name}.s3.access-key-id": "your_s3_access_key",
    f"spark.sql.catalog.{catalog_name}.s3.secret-access-key": "your_s3_secret_key",
    f"spark.sql.catalog.{catalog_name}.s3.path-style-access": "true",
    f"spark.sql.catalog.{catalog_name}.s3.region": "us-east-1",

    # Настройки OAuth2 аутентификации
    f"spark.sql.catalog.{catalog_name}.rest.auth.type": "com.dremio.iceberg.authmgr.oauth2.OAuth2Manager",
    f"spark.sql.catalog.{catalog_name}.rest.auth.oauth2.client-id": "keycloak_client_id",
    f"spark.sql.catalog.{catalog_name}.rest.auth.oauth2.client-secret": "keycloak_client_secret",
    f"spark.sql.catalog.{catalog_name}.rest.auth.oauth2.grant-type": "password",
    f"spark.sql.catalog.{catalog_name}.rest.auth.oauth2.resource-owner.username": "your_ldap_username",
    f"spark.sql.catalog.{catalog_name}.rest.auth.oauth2.resource-owner.password": "your_ldap_password",
    f"spark.sql.catalog.{catalog_name}.rest.auth.oauth2.issuer-url": "http://keycloak.mwsdata.svc.cluster.local:80/realms/mwsdata/",
}

# Создание Spark сессии
spark = SparkSession.builder \
    .config(map=configuration) \
    .getOrCreate()

Конфигурация для Kubernetes

import os
import socket

KUBERNETES_SERVICE_HOST = os.environ['KUBERNETES_SERVICE_HOST']
HOSTNAME = socket.gethostname()
HOST_IP = socket.gethostbyname(HOSTNAME)

configuration = {
    "spark.master": f"k8s://https://{KUBERNETES_SERVICE_HOST}:443",
    "spark.kubernetes.namespace": "your-namespace",
    "spark.kubernetes.authenticate.serviceAccountName": "spark",
    "spark.driver.host": HOST_IP,
    "spark.driver.memory": "2g",
    "spark.driver.cores": "1",
    "spark.executor.instances": "2",
    "spark.executor.memory": "2g",
    "spark.kubernetes.container.image": "registry.company.local/platform/docker-images/spark/spark3.5:1.0.0",

    # Добавить настройки Iceberg из примера выше
}

spark = SparkSession.builder \
    .config(map=configuration) \
    .getOrCreate()

Настройка Iceberg Catalog в StarRocks

Создание внешнего каталога

CREATE EXTERNAL CATALOG IcebergRest
PROPERTIES (
    "type" = "iceberg",
    "iceberg.catalog.type" = "rest",
    "iceberg.catalog.uri" = "http://proxy-adapter-rest-fixture.mwsdata.svc.cluster.local:8000",
    "iceberg.catalog.io-impl" = "org.apache.iceberg.aws.s3.S3FileIO",
    "iceberg.catalog.security" = "OAUTH2",

    -- Настройки S3
    "aws.s3.endpoint" = "http://s3balancer.company.local:9878",
    "aws.s3.access_key" = "your_s3_access_key",
    "aws.s3.secret_key" = "your_s3_secret_key",
    "aws.s3.enable_path_style_access" = "true",
    "aws.s3.region" = "us-east-1",

    -- Настройки OAuth2
    "iceberg.catalog.oauth2-server-uri" = "http://keycloak.mwsdata.svc.cluster.local:80/realms/mwsdata/protocol/openid-connect/token",
    "iceberg.catalog.credential" = "client_id:client_secret",
    "iceberg.catalog.oauth2-token-refresh-enabled" = "true",

    "client.factory" = "com.starrocks.connector.iceberg.IcebergClientFactory"
);

Работа с каталогом в StarRocks

-- Использование каталога
SET CATALOG IcebergRest;

-- Создание базы данных (должно содержать имя тенанта)
CREATE DATABASE IF NOT EXISTS newtenant_analytics;

-- Использование базы данных
USE newtenant_analytics;

-- Создание таблицы
CREATE TABLE newtenant_analytics.sales_data (
    order_id BIGINT,
    customer_id INT,
    order_amount DECIMAL(10,2),
    order_date DATE
);

-- Вставка данных
INSERT INTO newtenant_analytics.sales_data VALUES
('US', 1, 123.45, '2024-01-01'),
('FR', 2, 67.89, '2024-01-02');

Получение учетных данных

Доступы к S3

Учетные данные для S3 можно получить:

  1. Через KubeSphere UI:
  2. Перейдите в раздел Secrets вашего тенанта
  3. Найдите секрет с именем ozone
  4. Значения:

    • s3_access_keyspark.sql.catalog.{catalog_name}.s3.access-key-id
    • s3_secret_keyspark.sql.catalog.{catalog_name}.s3.secret-access-key
  5. Через письмо при онбординге команды

OAuth2 учетные данные

Учетные данные для Keycloak:

  1. Client ID/Secret:
  2. В KubeSphere: секрет keycloak в неймспейсе тенанта
  3. Один клиент для всех пользователей тенанта

  4. Пользовательские учетные данные:

  5. Личный LDAP логин и пароль
  6. Используются для resource-owner.username и resource-owner.password

Правила именования

Для баз данных и таблиц

  • Базы данных должны содержать имя тенанта: {tenant_name}_database
  • Таблицы следуют шаблону: s3a://{tenant_name}/{database_name}/{table_name}
  • S3 бакеты соответствуют именам тенантов

Примеры корректных имен

-- Корректно (содержит имя тенанта)
CREATE DATABASE newtenant_sales;

-- Некорректно (не содержит имя тенанта)
CREATE DATABASE sales;  -- Ошибка: доступ запрещен

Настройка Spark History Server

Для мониторинга Spark-приложений необходимо настроить event log:

eventlog_config = {
    "spark.eventLog.enabled": "true",
    "spark.eventLog.dir": f"s3a://{tenant_name}/eventlog",

    # Настройки S3 для eventlog
    "spark.hadoop.fs.s3a.access.key": "your_s3_access_key",
    "spark.hadoop.fs.s3a.secret.key": "your_s3_secret_key",
    "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "spark.hadoop.fs.s3a.endpoint": "http://s3balancer.company.local:9878",
    "spark.hadoop.fs.s3a.path.style.access": "true",
    "spark.hadoop.fs.s3a.multiobjectdelete.enable": "false",  # Требуется для Ozone
}

# Добавить к основной конфигурации
configuration.update(eventlog_config)

Решение проблем

Распространенные ошибки и решения

1. Ошибка аутентификации OAuth2

Authentication failed: Invalid client credentials
- Проверьте client_id и client_secret в секрете keycloak - Убедитесь, что пользовательские учетные данные корректны

2. Ошибка доступа к S3

AccessDenied: Access Denied
- Проверьте S3 credentials в секрете ozone - Убедитесь, что бакет существует и доступен

3. Ошибка создания базы данных

Database name must contain tenant name
- Убедитесь, что имя базы данных содержит имя тенанта - Пример: newtenant_analytics вместо analytics

4. Проблемы с токенами в StarRocks

403 ExpiredSignatureError: Signature has expired
- Пересоздайте каталог в StarRocks - Проблема связана с обновлением OAuth2 токенов

Диагностика подключения

# Проверка доступности каталога
spark.sql("SHOW NAMESPACES IN wh").show()

# Проверка таблиц
spark.sql("SHOW TABLES IN wh.newtenant_analytics").show()

# Тестовый запрос
spark.sql("SELECT * FROM wh.newtenant_analytics.test_table LIMIT 1").show()

Лучшие практики

Безопасность

  • Никогда не храните учетные данные в коде
  • Используйте секреты Kubernetes для управления credentials
  • Регулярно обновляйте пароли и токены

Производительность

  • Используйте партиционирование таблиц по датам
  • Настраивайте соответствующие размеры исполнителей Spark
  • Мониторьте использование ресурсов через Spark History Server

Управление данными

  • Следуйте соглашениям по именованию
  • Регулярно архивируйте старые данные
  • Используйте версионирование Iceberg для отслеживания изменений

Iceberg Catalog обеспечивает единый интерфейс для работы с табличными данными в объектном хранилище, поддерживая ACID-транзакции, schema evolution и мультитенантность.