Skip to content

Работа с Iceberg таблицами: запросы и операции

Введение

Это руководство охватывает основные операции работы с Apache Iceberg таблицами в LakeHouse платформе, включая создание, чтение, обновление и удаление данных через Spark и StarRocks.

Базовые операции через Spark

Создание таблиц

# Создание namespace (базы данных)
spark.sql("CREATE NAMESPACE IF NOT EXISTS myproject_analytics")

# Создание таблицы с явным указанием location
spark.sql("""
CREATE TABLE myproject_analytics.sales (
    id BIGINT,
    product STRING,
    amount DECIMAL(10,2),
    sale_date DATE
) USING iceberg
LOCATION 's3a://myproject/myproject_analytics/sales'
""")

# Создание таблицы из DataFrame с автоматической схемой
df = spark.range(1000).selectExpr(
    "id", 
    "concat('product_', id) as product_name",
    "rand() * 100 as price",
    "current_date() as created_date"
)
df.writeTo("myproject_analytics.products").create()

Вставка данных

# Вставка одиночных записей
spark.sql("""
INSERT INTO myproject_analytics.sales VALUES
(1, 'laptop', 999.99, '2024-01-15'),
(2, 'mouse', 29.99, '2024-01-16')
""")

# Вставка из DataFrame
data = [
    (3, 'keyboard', 79.99, '2024-01-17'),
    (4, 'monitor', 299.99, '2024-01-18')
]
df = spark.createDataFrame(data, ["id", "product", "amount", "sale_date"])
df.writeTo("myproject_analytics.sales").append()

# Пакетная вставка с перезаписью
df.writeTo("myproject_analytics.sales").overwrite()

Чтение данных

# Базовые запросы
sales_df = spark.table("myproject_analytics.sales")
sales_df.show()

# SQL запросы с фильтрацией
high_value_sales = spark.sql("""
SELECT * FROM myproject_analytics.sales 
WHERE amount > 100
ORDER BY sale_date DESC
""")

# Агрегации
summary = spark.sql("""
SELECT 
    product,
    COUNT(*) as transaction_count,
    SUM(amount) as total_revenue,
    AVG(amount) as avg_amount
FROM myproject_analytics.sales 
GROUP BY product
""")

Обновление и удаление данных

# Обновление записей (MERGE)
spark.sql("""
MERGE INTO myproject_analytics.sales AS target
USING (
    SELECT 1 as id, 'gaming_laptop' as product, 1299.99 as amount, '2024-01-15' as sale_date
) AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

# Удаление записей
spark.sql("DELETE FROM myproject_analytics.sales WHERE amount < 10")

# Удаление по условию через DataFrame
spark.table("myproject_analytics.sales") \
    .filter("sale_date < '2024-01-01'") \
    .delete()

Работа через StarRocks

Создание внешнего каталога

-- Создание внешнего каталога для доступа к Iceberg данным
CREATE EXTERNAL CATALOG iceberg_myproject
PROPERTIES (
    "aws.s3.access_key" = "your_access_key",
    "aws.s3.secret_key" = "your_secret_key",
    "aws.s3.endpoint" = "http://s3balancer.company.local:9878",
    "aws.s3.enable_path_style_access" = "true",
    "aws.s3.region" = "us-east-1",

    "type" = "iceberg",
    "iceberg.catalog.type" = "rest",
    "iceberg.catalog.uri" = "http://proxy-adapter-rest-fixture.platform-ldap.svc.cluster.local:8000",
    "iceberg.catalog.io-impl" = "org.apache.iceberg.aws.s3.S3FileIO",

    "iceberg.catalog.security" = "OAUTH2",
    "iceberg.catalog.oauth2-token-refresh-enabled" = "true",
    "iceberg.catalog.credential" = "client_id:client_secret",
    "iceberg.catalog.oauth2-server-uri" = "http://keycloak.platform-ldap.svc.cluster.local:80/realms/platform-ldap/protocol/openid-connect/token",

    "client.factory" = "com.starrocks.connector.iceberg.IcebergClientFactory"
);

Запросы через StarRocks

-- Использование каталога
SET CATALOG iceberg_myproject;

-- Просмотр доступных баз данных
SHOW DATABASES;

-- Использование базы данных
USE myproject_analytics;

-- Просмотр таблиц
SHOW TABLES;

-- Базовые запросы
SELECT * FROM sales WHERE amount > 100;

-- Агрегации
SELECT 
    product,
    COUNT(*) as count,
    SUM(amount) as total
FROM sales 
GROUP BY product 
ORDER BY total DESC;

-- JOIN операций между таблицами
SELECT 
    s.product,
    s.amount,
    p.product_name
FROM sales s
JOIN products p ON s.product = p.product_name;

Продвинутые операции

Time Travel и версионность

# Чтение данных на определенный момент времени
spark.read \
    .option("as-of-timestamp", "2024-01-01 12:00:00") \
    .table("myproject_analytics.sales")

# Чтение конкретной версии снимка
spark.read \
    .option("snapshot-id", "1234567890123456789") \
    .table("myproject_analytics.sales")

# Просмотр истории таблицы
spark.sql("SELECT * FROM myproject_analytics.sales.snapshots").show()

# Откат к предыдущей версии
spark.sql("CALL myproject_analytics.rollback_to_snapshot('myproject_analytics.sales', 1234567890123456789)")

Schema Evolution

# Добавление нового столбца
spark.sql("ALTER TABLE myproject_analytics.sales ADD COLUMN customer_id STRING")

# Переименование столбца
spark.sql("ALTER TABLE myproject_analytics.sales RENAME COLUMN product TO product_name")

# Изменение типа данных
spark.sql("ALTER TABLE myproject_analytics.sales ALTER COLUMN amount TYPE DECIMAL(12,2)")

# Удаление столбца
spark.sql("ALTER TABLE myproject_analytics.sales DROP COLUMN customer_id")

Оптимизация таблиц

# Компактизация файлов данных
spark.sql("CALL myproject_analytics.rewrite_data_files('myproject_analytics.sales')")

# Удаление старых снимков
spark.sql("CALL myproject_analytics.expire_snapshots('myproject_analytics.sales', TIMESTAMP '2024-01-01 00:00:00')")

# Оптимизация для запросов
spark.sql("CALL myproject_analytics.rewrite_manifests('myproject_analytics.sales')")

Мультитенантные сценарии

Работа с данными разных тенантов

# Создание таблицы с явным указанием tenant prefix
tenant_name = "myproject"

# Автоматическое добавление tenant prefix к именам таблиц
def create_tenant_table(table_name, schema):
    full_table_name = f"{tenant_name}_{table_name}"
    spark.sql(f"CREATE TABLE {full_table_name} {schema}")

# Пример использования
create_tenant_table("users", "(id BIGINT, name STRING, email STRING)")

Безопасность и доступы

# Проверка доступов перед операциями
def check_tenant_access(table_name):
    if not table_name.startswith(tenant_name + "_"):
        raise Exception(f"Access denied to table {table_name}")

    # Дополнительные проверки прав доступа
    # ...

# Безопасное выполнение запросов
def safe_query(query):
    # Проверка, что запрос не обращается к чужим данным
    if tenant_name not in query:
        raise Exception("Query must reference tenant-specific tables")

    return spark.sql(query)

Мониторинг и отладка

Просмотр метаданных таблиц

# Информация о таблице
spark.sql("DESCRIBE EXTENDED myproject_analytics.sales").show(truncate=False)

# Статистика таблицы
spark.sql("ANALYZE TABLE myproject_analytics.sales COMPUTE STATISTICS")

# Просмотр файлов таблицы
spark.sql("SELECT * FROM myproject_analytics.sales.files").show()

# Просмотр манифестов
spark.sql("SELECT * FROM myproject_analytics.sales.manifests").show()

Логирование и трассировка

# Включение детального логирования
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Мониторинг выполнения запросов
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

Решение проблем

Частые ошибки и решения

Ошибка: Table not found

# Проверка существования таблицы
if spark.catalog.tableExists("myproject_analytics.sales"):
    # Операции с таблицей
    pass
else:
    print("Таблица не существует")

Ошибка: Permission denied

# Проверка доступов в S3
aws s3 ls s3://myproject/myproject_analytics/ --endpoint-url http://s3balancer.company.local:9878

Ошибка: Schema mismatch

# Принудительное приведение схемы
df.writeTo("myproject_analytics.sales").option("mergeSchema", "true").append()

Производительность запросов

# Настройка параллелизма
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "100")

# Кэширование часто используемых таблиц
spark.table("myproject_analytics.sales").cache()
spark.table("myproject_analytics.sales").count()  # Активация кэширования

# Использование партиционирования
spark.sql("""
CREATE TABLE myproject_analytics.sales_partitioned (
    id BIGINT,
    product STRING,
    amount DECIMAL(10,2)
) USING iceberg
PARTITIONED BY (days(sale_date))
LOCATION 's3a://myproject/myproject_analytics/sales_partitioned'
""")

Заключение

Это руководство охватывает основные сценарии работы с Iceberg таблицами в LakeHouse платформе. Для более сложных операций и оптимизаций обратитесь к официальной документации Apache Iceberg и StarRocks.

Полезные ссылки