Skip to content

Выполнение операторов в Iceberg

Используйте Iceberg.execute(...) для выполнения операций DDL и DML.

Warning

В запросах DML/DDL имена таблиц должны включать префикс каталога.

Поддержка синтаксиса

Этот метод поддерживает любой синтаксис запросов, поддерживаемый Iceberg (Spark SQL), например:

  • ✅︎ CREATE TABLE ..., CREATE VIEW ...
  • ✅︎ INSERT INTO ... SELECT ..., MERGE INTO ...
  • ✅︎ ALTER TABLE ... ADD COLUMN, ALTER TABLE ... DROP COLUMN
  • ✅︎ DROP TABLE ..., DROP VIEW ...
  • ✅︎ REPLACE TABLE ...
  • ✅︎ другие запросы, поддерживаемые Iceberg
  • SET ...; SELECT ...; - поддержка множественных операторов отсутствует

Примеры

from onetl.connection import Iceberg

iceberg = Iceberg(catalog_name="my_catalog", ...)

iceberg.execute("DROP TABLE my_catalog.my_schema.my_table")
iceberg.execute(
    """
    CREATE TABLE my_catalog.my_schema.my_table (
        id BIGINT,
        key STRING,
        value DOUBLE
    )
    USING iceberg
    """,
)

Подробности

Execute DDL or DML statement. support hooks

Parameters

statement : str

Statement to be executed.
Source code in onetl/connection/db_connection/iceberg/connection.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
@slot
def execute(
    self,
    statement: str,
) -> None:
    """
    Execute DDL or DML statement. [![support hooks](https://img.shields.io/badge/%20-support%20hooks-blue)](/hooks/)

    Parameters
    ----------
    statement : str

        Statement to be executed.
    """

    statement = clear_statement(statement)

    log.info("|%s| Executing statement:", self.__class__.__name__)
    log_lines(log, statement)

    with SparkMetricsRecorder(self.spark) as recorder:
        try:
            with override_job_description(self.spark, f"{self}.execute()"):
                self._execute_sql(statement).collect()
        except Exception:
            log.exception("|%s| Execution failed", self.__class__.__name__)
            metrics = recorder.metrics()
            if log.isEnabledFor(logging.DEBUG) and not metrics.is_empty:
                # as SparkListener results are not guaranteed to be received in time,
                # some metrics may be missing. To avoid confusion, log only in debug, and with a notice
                log.info("|%s| Recorded metrics (some values may be missing!):", self.__class__.__name__)
                log_lines(log, str(metrics), level=logging.DEBUG)
            raise

        log.info("|%s| Execution succeeded", self.__class__.__name__)

        metrics = recorder.metrics()
        if log.isEnabledFor(logging.DEBUG) and not metrics.is_empty:
            # as SparkListener results are not guaranteed to be received in time,
            # some metrics may be missing. To avoid confusion, log only in debug, and with a notice
            log.info("|%s| Recorded metrics (some values may be missing!):", self.__class__.__name__)
            log_lines(log, str(metrics), level=logging.DEBUG)