Skip to content

Выполнение запросов в Hive

Используйте Hive.execute(...) для выполнения операций DDL и DML.

Поддержка синтаксиса

Этот метод позволяет использовать любой синтаксис запросов, поддерживаемый Hive, например:

  • ✅︎ CREATE TABLE ..., CREATE VIEW ... и так далее
  • ✅︎ LOAD DATA ... и так далее
  • ✅︎ ALTER ...
  • ✅︎ INSERT INTO ... SELECT ... и так далее
  • ✅︎ DROP TABLE ..., DROP VIEW ... и так далее
  • ✅︎ MSCK REPAIR TABLE ... и так далее
  • ✅︎ другие запросы, не упомянутые здесь
  • SET ...; SELECT ...; - множественные запросы не поддерживаются

Warning

Фактически, запрос должен быть написан с использованием синтаксиса SparkSQL, а не HiveQL.

Примеры

    from onetl.connection import Hive
    hive = Hive(...)
    hive.execute("DROP TABLE schema.table")
    hive.execute(
        """
        CREATE TABLE schema.table (
            id NUMBER,
            key VARCHAR,
            value DOUBLE
        )
        PARTITION BY (business_date DATE)
        STORED AS orc
        """
    )

Подробности

Execute DDL or DML statement. support hooks

Added in 0.2.0

Parameters

statement : str

Statement to be executed.
Source code in onetl/connection/db_connection/hive/connection.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
@slot
def execute(
    self,
    statement: str,
) -> None:
    """
    Execute DDL or DML statement. [![support hooks](https://img.shields.io/badge/%20-support%20hooks-blue)](/hooks/)

    !!! success "Added in 0.2.0"

    Parameters
    ----------
    statement : str

        Statement to be executed.
    """

    statement = clear_statement(statement)

    log.info("|%s| Executing statement:", self.__class__.__name__)
    log_lines(log, statement)

    with SparkMetricsRecorder(self.spark) as recorder:
        try:
            with override_job_description(self.spark, f"{self}.execute()"):
                self._execute_sql(statement).collect()
        except Exception:
            log.exception("|%s| Execution failed", self.__class__.__name__)

            metrics = recorder.metrics()
            if log.isEnabledFor(logging.DEBUG) and not metrics.is_empty:
                # as SparkListener results are not guaranteed to be received in time,
                # some metrics may be missing. To avoid confusion, log only in debug, and with a notice
                log.info("|%s| Recorded metrics (some values may be missing!):", self.__class__.__name__)
                log_lines(log, str(metrics), level=logging.DEBUG)
            raise

        log.info("|%s| Execution succeeded", self.__class__.__name__)

        metrics = recorder.metrics()
        if log.isEnabledFor(logging.DEBUG) and not metrics.is_empty:
            # as SparkListener results are not guaranteed to be received in time,
            # some metrics may be missing. To avoid confusion, log only in debug, and with a notice
            log.info("|%s| Recorded metrics (some values may be missing!):", self.__class__.__name__)
            log_lines(log, str(metrics), level=logging.DEBUG)