Работа с Iceberg таблицами: запросы и операции
Введение
Это руководство охватывает основные операции работы с Apache Iceberg таблицами в LakeHouse платформе, включая создание, чтение, обновление и удаление данных через Spark и StarRocks.
Базовые операции через Spark
Создание таблиц
# Создание namespace (базы данных)
spark.sql("CREATE NAMESPACE IF NOT EXISTS myproject_analytics")
# Создание таблицы с явным указанием location
spark.sql("""
CREATE TABLE myproject_analytics.sales (
id BIGINT,
product STRING,
amount DECIMAL(10,2),
sale_date DATE
) USING iceberg
LOCATION 's3a://myproject/myproject_analytics/sales'
""")
# Создание таблицы из DataFrame с автоматической схемой
df = spark.range(1000).selectExpr(
"id",
"concat('product_', id) as product_name",
"rand() * 100 as price",
"current_date() as created_date"
)
df.writeTo("myproject_analytics.products").create()
Вставка данных
# Вставка одиночных записей
spark.sql("""
INSERT INTO myproject_analytics.sales VALUES
(1, 'laptop', 999.99, '2024-01-15'),
(2, 'mouse', 29.99, '2024-01-16')
""")
# Вставка из DataFrame
data = [
(3, 'keyboard', 79.99, '2024-01-17'),
(4, 'monitor', 299.99, '2024-01-18')
]
df = spark.createDataFrame(data, ["id", "product", "amount", "sale_date"])
df.writeTo("myproject_analytics.sales").append()
# Пакетная вставка с перезаписью
df.writeTo("myproject_analytics.sales").overwrite()
Чтение данных
# Базовые запросы
sales_df = spark.table("myproject_analytics.sales")
sales_df.show()
# SQL запросы с фильтрацией
high_value_sales = spark.sql("""
SELECT * FROM myproject_analytics.sales
WHERE amount > 100
ORDER BY sale_date DESC
""")
# Агрегации
summary = spark.sql("""
SELECT
product,
COUNT(*) as transaction_count,
SUM(amount) as total_revenue,
AVG(amount) as avg_amount
FROM myproject_analytics.sales
GROUP BY product
""")
Обновление и удаление данных
# Обновление записей (MERGE)
spark.sql("""
MERGE INTO myproject_analytics.sales AS target
USING (
SELECT 1 as id, 'gaming_laptop' as product, 1299.99 as amount, '2024-01-15' as sale_date
) AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Удаление записей
spark.sql("DELETE FROM myproject_analytics.sales WHERE amount < 10")
# Удаление по условию через DataFrame
spark.table("myproject_analytics.sales") \
.filter("sale_date < '2024-01-01'") \
.delete()
Работа через StarRocks
Создание внешнего каталога
-- Создание внешнего каталога для доступа к Iceberg данным
CREATE EXTERNAL CATALOG iceberg_myproject
PROPERTIES (
"aws.s3.access_key" = "your_access_key",
"aws.s3.secret_key" = "your_secret_key",
"aws.s3.endpoint" = "http://s3balancer.company.local:9878",
"aws.s3.enable_path_style_access" = "true",
"aws.s3.region" = "us-east-1",
"type" = "iceberg",
"iceberg.catalog.type" = "rest",
"iceberg.catalog.uri" = "http://proxy-adapter-rest-fixture.platform-ldap.svc.cluster.local:8000",
"iceberg.catalog.io-impl" = "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.security" = "OAUTH2",
"iceberg.catalog.oauth2-token-refresh-enabled" = "true",
"iceberg.catalog.credential" = "client_id:client_secret",
"iceberg.catalog.oauth2-server-uri" = "http://keycloak.platform-ldap.svc.cluster.local:80/realms/platform-ldap/protocol/openid-connect/token",
"client.factory" = "com.starrocks.connector.iceberg.IcebergClientFactory"
);
Запросы через StarRocks
-- Использование каталога
SET CATALOG iceberg_myproject;
-- Просмотр доступных баз данных
SHOW DATABASES;
-- Использование базы данных
USE myproject_analytics;
-- Просмотр таблиц
SHOW TABLES;
-- Базовые запросы
SELECT * FROM sales WHERE amount > 100;
-- Агрегации
SELECT
product,
COUNT(*) as count,
SUM(amount) as total
FROM sales
GROUP BY product
ORDER BY total DESC;
-- JOIN операций между таблицами
SELECT
s.product,
s.amount,
p.product_name
FROM sales s
JOIN products p ON s.product = p.product_name;
Продвинутые операции
Time Travel и версионность
# Чтение данных на определенный момент времени
spark.read \
.option("as-of-timestamp", "2024-01-01 12:00:00") \
.table("myproject_analytics.sales")
# Чтение конкретной версии снимка
spark.read \
.option("snapshot-id", "1234567890123456789") \
.table("myproject_analytics.sales")
# Просмотр истории таблицы
spark.sql("SELECT * FROM myproject_analytics.sales.snapshots").show()
# Откат к предыдущей версии
spark.sql("CALL myproject_analytics.rollback_to_snapshot('myproject_analytics.sales', 1234567890123456789)")
Schema Evolution
# Добавление нового столбца
spark.sql("ALTER TABLE myproject_analytics.sales ADD COLUMN customer_id STRING")
# Переименование столбца
spark.sql("ALTER TABLE myproject_analytics.sales RENAME COLUMN product TO product_name")
# Изменение типа данных
spark.sql("ALTER TABLE myproject_analytics.sales ALTER COLUMN amount TYPE DECIMAL(12,2)")
# Удаление столбца
spark.sql("ALTER TABLE myproject_analytics.sales DROP COLUMN customer_id")
Оптимизация таблиц
# Компактизация файлов данных
spark.sql("CALL myproject_analytics.rewrite_data_files('myproject_analytics.sales')")
# Удаление старых снимков
spark.sql("CALL myproject_analytics.expire_snapshots('myproject_analytics.sales', TIMESTAMP '2024-01-01 00:00:00')")
# Оптимизация для запросов
spark.sql("CALL myproject_analytics.rewrite_manifests('myproject_analytics.sales')")
Мультитенантные сценарии
Работа с данными разных тенантов
# Создание таблицы с явным указанием tenant prefix
tenant_name = "myproject"
# Автоматическое добавление tenant prefix к именам таблиц
def create_tenant_table(table_name, schema):
full_table_name = f"{tenant_name}_{table_name}"
spark.sql(f"CREATE TABLE {full_table_name} {schema}")
# Пример использования
create_tenant_table("users", "(id BIGINT, name STRING, email STRING)")
Безопасность и доступы
# Проверка доступов перед операциями
def check_tenant_access(table_name):
if not table_name.startswith(tenant_name + "_"):
raise Exception(f"Access denied to table {table_name}")
# Дополнительные проверки прав доступа
# ...
# Безопасное выполнение запросов
def safe_query(query):
# Проверка, что запрос не обращается к чужим данным
if tenant_name not in query:
raise Exception("Query must reference tenant-specific tables")
return spark.sql(query)
Мониторинг и отладка
Просмотр метаданных таблиц
# Информация о таблице
spark.sql("DESCRIBE EXTENDED myproject_analytics.sales").show(truncate=False)
# Статистика таблицы
spark.sql("ANALYZE TABLE myproject_analytics.sales COMPUTE STATISTICS")
# Просмотр файлов таблицы
spark.sql("SELECT * FROM myproject_analytics.sales.files").show()
# Просмотр манифестов
spark.sql("SELECT * FROM myproject_analytics.sales.manifests").show()
Логирование и трассировка
# Включение детального логирования
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Мониторинг выполнения запросов
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
Решение проблем
Частые ошибки и решения
Ошибка: Table not found
# Проверка существования таблицы
if spark.catalog.tableExists("myproject_analytics.sales"):
# Операции с таблицей
pass
else:
print("Таблица не существует")
Ошибка: Permission denied
# Проверка доступов в S3
aws s3 ls s3://myproject/myproject_analytics/ --endpoint-url http://s3balancer.company.local:9878
Ошибка: Schema mismatch
# Принудительное приведение схемы
df.writeTo("myproject_analytics.sales").option("mergeSchema", "true").append()
Производительность запросов
# Настройка параллелизма
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "100")
# Кэширование часто используемых таблиц
spark.table("myproject_analytics.sales").cache()
spark.table("myproject_analytics.sales").count() # Активация кэширования
# Использование партиционирования
spark.sql("""
CREATE TABLE myproject_analytics.sales_partitioned (
id BIGINT,
product STRING,
amount DECIMAL(10,2)
) USING iceberg
PARTITIONED BY (days(sale_date))
LOCATION 's3a://myproject/myproject_analytics/sales_partitioned'
""")
Заключение
Это руководство охватывает основные сценарии работы с Iceberg таблицами в LakeHouse платформе. Для более сложных операций и оптимизаций обратитесь к официальной документации Apache Iceberg и StarRocks.