Skip to content

Чтение из Hive с помощью Hive.sql

Hive.sql позволяет передавать пользовательский SQL-запрос, но не поддерживает инкрементальные стратегии.

Поддержка синтаксиса

Поддерживаются только запросы со следующим синтаксисом:

  • ✅︎ SELECT ... FROM ...
  • ✅︎ WITH alias AS (...) SELECT ...
  • SET ...; SELECT ...; - несколько операторов не поддерживаются

Warning

Фактически, запрос должен быть написан с использованием синтаксиса SparkSQL, а не HiveQL.

Примеры

```python
from onetl.connection import Hive

hive = Hive(...)
df = hive.sql(
    """
    SELECT
        id,
        key,
        CAST(value AS text) value,
        updated_at
    FROM
        some.mytable
    WHERE
        key = 'something'
    """
)   
```

Рекомендации

Используйте столбцовые форматы записи

Предпочтительны следующие форматы записи:

Для столбцовых форматов записи каждый файл содержит отдельные секции, где хранятся данные столбцов. В нижнем колонтитуле файла содержится информация о расположении каждой секции/группы столбцов. Spark может использовать эту информацию для загрузки только тех секций, которые требуются для конкретного запроса, например, только выбранных столбцов, что значительно ускоряет выполнение запроса.

Еще одно преимущество — высокий коэффициент сжатия, например, в 10-100 раз по сравнению с JSON или CSV.

Выбирайте только необходимые столбцы

Вместо использования SELECT * FROM ... предпочтительнее указывать точные имена столбцов SELECT col1, col2, .... Это значительно снижает объем данных, считываемых Spark, если используются столбцовые форматы файлов.

Используйте столбцы разделов в условии where

Запросы должны включать условие WHERE с фильтрами по столбцам секционирования Hive. Это позволяет Spark читать только небольшой набор файлов (отсечение разделов) вместо сканирования всей таблицы, что значительно повышает производительность.

Поддерживаемые операторы: =, >, < и BETWEEN, и только по отношению к некоторым статическим значениям.

Подробнее

Lazily execute SELECT statement and return DataFrame. |support_hooks|

Same as spark.sql(query).

.. versionadded:: 0.2.0

Parameters

query : str

SQL query to be executed.

Returns

df : pyspark.sql.dataframe.DataFrame

Spark dataframe
Source code in onetl/connection/db_connection/hive/connection.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@slot
def sql(
    self,
    query: str,
) -> DataFrame:
    """
    Lazily execute SELECT statement and return DataFrame. |support_hooks|

    Same as ``spark.sql(query)``.

    .. versionadded:: 0.2.0

    Parameters
    ----------
    query : str

        SQL query to be executed.

    Returns
    -------
    df : pyspark.sql.dataframe.DataFrame

        Spark dataframe
    """

    query = clear_statement(query)

    log.info("|%s| Executing SQL query:", self.__class__.__name__)
    log_lines(log, query)

    with SparkMetricsRecorder(self.spark) as recorder:
        try:
            with override_job_description(self.spark, f"{self}.sql()"):
                df = self._execute_sql(query)
        except Exception:
            log.error("|%s| Query failed", self.__class__.__name__)

            metrics = recorder.metrics()
            if log.isEnabledFor(logging.DEBUG) and not metrics.is_empty:
                # as SparkListener results are not guaranteed to be received in time,
                # some metrics may be missing. To avoid confusion, log only in debug, and with a notice
                log.info("|%s| Recorded metrics (some values may be missing!):", self.__class__.__name__)
                log_lines(log, str(metrics), level=logging.DEBUG)
            raise

        log.info("|Spark| DataFrame successfully created from SQL statement")

        metrics = recorder.metrics()
        if log.isEnabledFor(logging.DEBUG) and not metrics.is_empty:
            # as SparkListener results are not guaranteed to be received in time,
            # some metrics may be missing. To avoid confusion, log only in debug, and with a notice
            log.info("|%s| Recorded metrics (some values may be missing!):", self.__class__.__name__)
            log_lines(log, str(metrics), level=logging.DEBUG)

    return df