Skip to content

Выполнение предложений в Hive

Используйте Hive.execute(...) для выполнения операций DDL и DML.

Поддержка синтаксиса

Этот метод позволяет использовать любой синтаксис запросов, поддерживаемый Hive, например:

  • ✅︎ CREATE TABLE ..., CREATE VIEW ... и так далее
  • ✅︎ LOAD DATA ... и так далее
  • ✅︎ ALTER ...
  • ✅︎ INSERT INTO ... SELECT ... и так далее
  • ✅︎ DROP TABLE ..., DROP VIEW ... и так далее
  • ✅︎ MSCK REPAIR TABLE ... и так далее
  • ✅︎ другие запросы, не упомянутые здесь
  • SET ...; SELECT ...; - множественные запросы не поддерживаются

Warning

Фактически, запрос должен быть написан с использованием синтаксиса SparkSQL, а не HiveQL.

Примеры

```python
    from onetl.connection import Hive

    hive = Hive(...)

    hive.execute("DROP TABLE schema.table")
    hive.execute(
        """
        CREATE TABLE schema.table (
            id NUMBER,
            key VARCHAR,
            value DOUBLE
        )
        PARTITION BY (business_date DATE)
        STORED AS orc
        """
    )
```

Подробности

Execute DDL or DML statement. |support_hooks|

.. versionadded:: 0.2.0

Parameters

statement : str

Statement to be executed.
Source code in onetl/connection/db_connection/hive/connection.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
@slot
def execute(
    self,
    statement: str,
) -> None:
    """
    Execute DDL or DML statement. |support_hooks|

    .. versionadded:: 0.2.0

    Parameters
    ----------
    statement : str

        Statement to be executed.
    """

    statement = clear_statement(statement)

    log.info("|%s| Executing statement:", self.__class__.__name__)
    log_lines(log, statement)

    with SparkMetricsRecorder(self.spark) as recorder:
        try:
            with override_job_description(self.spark, f"{self}.execute()"):
                self._execute_sql(statement).collect()
        except Exception:
            log.error("|%s| Execution failed", self.__class__.__name__)
            metrics = recorder.metrics()
            if log.isEnabledFor(logging.DEBUG) and not metrics.is_empty:
                # as SparkListener results are not guaranteed to be received in time,
                # some metrics may be missing. To avoid confusion, log only in debug, and with a notice
                log.info("|%s| Recorded metrics (some values may be missing!):", self.__class__.__name__)
                log_lines(log, str(metrics), level=logging.DEBUG)
            raise

        log.info("|%s| Execution succeeded", self.__class__.__name__)

        metrics = recorder.metrics()
        if log.isEnabledFor(logging.DEBUG) and not metrics.is_empty:
            # as SparkListener results are not guaranteed to be received in time,
            # some metrics may be missing. To avoid confusion, log only in debug, and with a notice
            log.info("|%s| Recorded metrics (some values may be missing!):", self.__class__.__name__)
            log_lines(log, str(metrics), level=logging.DEBUG)