Запись данных в Hive с помощью DBWriter
Для записи данных в Hive используйте DBWriter.
Примеры
```python
from onetl.connection import Hive
from onetl.db import DBWriter
hive = Hive(...)
df = ... # данные находятся здесь
# Создаем датафрейм с определенным количеством партиций Spark.
# Используем колонки партиционирования Hive для группировки данных. Создаем максимум 20 файлов на каждую партицию Hive.
# Также сортируем данные по колонке, с которой наиболее коррелируют данные (например, user_id), уменьшая размер файлов.
num_files_per_partition = 20
partition_columns = ["country", "business_date"]
sort_columns = ["user_id"]
write_df = df.repartition(
num_files_per_partition,
*partition_columns,
*sort_columns,
).sortWithinPartitions(*partition_columns, *sort_columns)
writer = DBWriter(
connection=hive,
target="schema.table",
options=Hive.WriteOptions(
if_exists="append",
# Колонки партиционирования Hive.
partitionBy=partition_columns,
),
)
writer.run(write_df)
```
Рекомендации
Используйте колоночные форматы записи
Предпочтительные форматы записи:
Warning
При использовании DBWriter формат данных spark по умолчанию, настроенный в spark.sql.sources.default, игнорируется, так как значение по умолчанию для Hive.WriteOptions(format=...) явно установлено на orc.
В колоночных форматах записи каждый файл содержит отдельные секции, где хранятся данные столбцов. Подвал файла содержит информацию о расположении каждой секции/группы столбцов. Spark может использовать эту информацию для загрузки только тех секций, которые требуются для конкретного запроса, например, только выбранных столбцов, чтобы значительно ускорить выполнение запроса.
Еще одно преимущество — высокий коэффициент сжатия, например, в 10-100 раз по сравнению с JSON или CSV.
Используйте партиционирование
Как это работает
Hive поддерживает разделение данных на партиции, которые представляют собой разные каталоги в файловой системе с именами типа some_col=value1/another_col=value2.
Например, датафрейм с содержимым:
| country: string | business_date: date | user_id: int | bytes: long |
|---|---|---|---|
| RU | 2024-01-01 | 1234 | 25325253525 |
| RU | 2024-01-01 | 2345 | 23234535243 |
| RU | 2024-01-02 | 1234 | 62346634564 |
| US | 2024-01-01 | 5678 | 4252345354 |
| US | 2024-01-02 | 5678 | 5474575745 |
| US | 2024-01-03 | 5678 | 3464574567 |
С параметром partitionBy=["country", "business_dt"] данные будут храниться в виде файлов в следующих подкаталогах:
/country=RU/business_date=2024-01-01//country=RU/business_date=2024-01-02//country=US/business_date=2024-01-01//country=US/business_date=2024-01-02//country=US/business_date=2024-01-03/
Отдельный подкаталог создается для каждой уникальной комбинации значений столбцов в датафрейме.
Пожалуйста, не путайте партиции датафрейма Spark (т.е. партии данных, обрабатываемые исполнителями Spark, обычно параллельно) и партиционирование Hive (хранение данных в разных подкаталогах). Количество партиций датафрейма Spark коррелирует с количеством файлов, создаваемых в каждой партиции Hive. Например, датафрейм Spark с 10 партициями и 5 различными значениями столбцов партиции Hive будет сохранен как 5 подкаталогов по 10 файлов в каждом = всего 50 файлов. Без партиционирования Hive все файлы размещаются в одном плоском каталоге.
Но зачем?
Запросы, которые имеют условие WHERE с фильтрами по столбцам партиционирования Hive, например WHERE country = 'RU' AND business_date='2024-01-01', будут считывать только файлы из этой конкретной партиции, например /country=RU/business_date=2024-01-01/, и пропускать файлы из других партиций.
Это значительно повышает производительность и снижает объем памяти, используемой Spark. Рекомендуется использовать партиционирование Hive во всех таблицах.
Какие колонки следует использовать?
Обычно столбцы партиционирования Hive основываются на дате события или местоположении, например country: string, business_date: date, run_date: date и т. д.
Столбцы партиций должны содержать данные с низкой кардинальностью. Даты, небольшие целые числа, строки с малым количеством возможных значений подходят. Но временные метки, числа с плавающей точкой, десятичные числа, длинные целые (например, идентификатор пользователя), строки с большим количеством уникальных значений (например, имя пользователя или email) НЕ должны использоваться в качестве столбцов партиционирования Hive. В отличие от некоторых других баз данных, партиции на основе диапазонов и хэш-функций не поддерживаются.
Столбец партиции должен быть частью датафрейма. Если вы хотите разбить значения по компоненту даты столбца business_dt: timestamp, добавьте новый столбец в датафрейм следующим образом: df.withColumn("business_date", date(df.business_dt)).
Используйте сжатие
Использование алгоритмов сжатия, таких как snappy, lz4 или zstd, может уменьшить размер файлов (до 10 раз).
Предпочитайте создавать большие файлы
HDFS и S3 не предназначены для хранения миллионов маленьких файлов. Минимальный размер файла должен быть не менее 10Мб, но обычно он составляет 128Мб+ или 256Мб+ (размер блока HDFS). НИКОГДА не создавайте файлы размером в несколько килобайт.
Количество файлов может различаться в разных случаях. С одной стороны, Spark Adaptive Query Execution (AQE) может объединять маленькие партиции датафрейма Spark в одну большую. С другой стороны, датафреймы с данными, в которых есть перекос могут создавать больше файлов, чем ожидалось.
Для создания небольшого количества больших файлов можно уменьшить количество партиций датафрейма Spark.
Используйте функцию df.repartition(N, columns...), например: df.repartition(20, "col1", "col2").
Это создаст новый датафрейм Spark с партициями, используя выражение hash(df.col1 + df.col2) mod 20, избегая перекоса данных.
Примечание: большие партиции датафрейма требуют больше ресурсов (CPU, RAM) на исполнителе Spark. Точное количество партиций должно определяться эмпирически, так как оно зависит от объема данных и доступных ресурсов.
Сортируйте данные перед записью
Датафрейм с отсортированным содержимым:
| country: string | business_date: date | user_id: int | business_dt: timestamp | bytes: long |
|---|---|---|---|---|
| RU | 2024-01-01 | 1234 | 2024-01-01T11:22:33.456 | 25325253525 |
| RU | 2024-01-01 | 1234 | 2024-01-01T12:23:44.567 | 25325253525 |
| RU | 2024-01-02 | 1234 | 2024-01-01T13:25:56.789 | 34335645635 |
| US | 2024-01-01 | 2345 | 2024-01-01T10:00:00.000 | 12341 |
| US | 2024-01-02 | 2345 | 2024-01-01T15:11:22.345 | 13435 |
| US | 2024-01-03 | 2345 | 2024-01-01T20:22:33.567 | 14564 |
Имеет гораздо лучший коэффициент сжатия, чем несортированный, например, в 2 или даже больше раз:
| country: string | business_date: date | user_id: int | business_dt: timestamp | bytes: long |
|---|---|---|---|---|
| RU | 2024-01-01 | 1234 | 2024-01-01T11:22:33.456 | 25325253525 |
| RU | 2024-01-01 | 6345 | 2024-12-01T23:03:44.567 | 25365 |
| RU | 2024-01-02 | 5234 | 2024-07-01T06:10:56.789 | 45643456747 |
| US | 2024-01-01 | 4582 | 2024-04-01T17:59:00.000 | 362546475 |
| US | 2024-01-02 | 2345 | 2024-09-01T04:24:22.345 | 3235 |
| US | 2024-01-03 | 3575 | 2024-03-01T21:37:33.567 | 346345764 |
Выбор столбцов для сортировки данных действительно зависит от самих данных. Если данные коррелируют с каким-то конкретным столбцом, как в примере выше, где объем трафика коррелирует как с user_id, так и с timestamp, используйте df.sortWithinPartitions("user_id", "timestamp") перед записью данных.
Если df.repartition(N, repartition_columns...) используется в сочетании с df.sortWithinPartitions(sort_columns...), то sort_columns должны начинаться с repartition_columns или быть равными им.
Опции
onetl.connection.db_connection.hive.options.HiveWriteOptions
Bases: GenericOptions
Hive source writing options.
You can pass here key-value items which then will be converted to calls
of :obj:pyspark.sql.readwriter.DataFrameWriter methods.
For example, Hive.WriteOptions(if_exists="append", partitionBy="reg_id") will
be converted to df.write.mode("append").partitionBy("reg_id") call, and so on.
Examples
.. note::
You can pass any method name and its value
`supported by Spark <https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!
The set of supported options depends on Spark version used.
.. code:: python
from onetl.connection import Hive
options = Hive.WriteOptions(
if_exists="append",
partitionBy="reg_id",
customSparkOption="value",
)
Source code in onetl/connection/db_connection/hive/options.py
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 | |
bucket_by = Field(default=None, alias='bucketBy')
class-attribute
instance-attribute
Number of buckets plus bucketing columns. None means bucketing is disabled.
Each bucket is created as a set of files with name containing result of calculation hash(columns) mod num_buckets.
This allows to remove shuffle from queries containing GROUP BY or JOIN or using = / IN predicates
on specific columns.
Examples: (10, "user_id"), (10, ["user_id", "user_phone"])
.. note::
Bucketing should be used on columns containing a lot of unique values,
like ``userId``.
Columns like ``date`` should **NOT** be used for bucketing
because of too low number of unique values.
.. warning::
It is recommended to use this option **ONLY** if you have a large table
(hundreds of Gb or more), which is used mostly for JOINs with other tables,
and you're inserting data using ``if_exists=overwrite_partitions`` or ``if_exists=replace_entire_table``.
Otherwise Spark will create a lot of small files
(one file for each bucket and each executor), drastically **decreasing** HDFS performance.
.. warning::
Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``
compression = None
class-attribute
instance-attribute
Compressing algorithm which should be used for compressing created files in HDFS.
None means compression is disabled.
Examples: snappy, zlib
.. warning::
Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``
format = 'orc'
class-attribute
instance-attribute
Format of files which should be used for storing table data.
Examples
- string format:
"orc"(default),"parquet","csv"(NOT recommended). - format class instance:
ORC(compression="snappy"),Parquet(),CSV(header=True, delimiter=",").
.. code::
options = Hive.WriteOptions(
if_exists="append",
partitionBy="reg_id",
format="orc",
)
# or using an ORC format class instance:
from onetl.file.format import ORC
options = Hive.WriteOptions(
if_exists="append",
partitionBy="reg_id",
format=ORC(compression="snappy"),
)
.. note::
It's better to use column-based formats like ``orc`` or ``parquet``,
not row-based (``csv``, ``json``)
.. warning::
Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``
if_exists = Field(default=(HiveTableExistBehavior.APPEND), alias=(avoid_alias('mode')))
class-attribute
instance-attribute
Behavior of writing data into existing table.
Possible values
-
append(default) Appends data into existing partition/table, or create partition/table if it does not exist.Same as Spark's
df.write.insertInto(table, overwrite=False)... dropdown:: Behavior in details
* Table does not exist Table is created using options provided by user (``format``, ``compression``, etc). * Table exists, but not partitioned, :obj:`~partition_by` is set Data is appended to a table. Table is still not partitioned (DDL is unchanged). * Table exists and partitioned, but has different partitioning schema than :obj:`~partition_by` Partition is created based on table's ``PARTITIONED BY (...)`` options. Explicit :obj:`~partition_by` value is ignored. * Table exists and partitioned according :obj:`~partition_by`, but partition is present only in dataframe Partition is created. * Table exists and partitioned according :obj:`~partition_by`, partition is present in both dataframe and table Data is appended to existing partition. .. warning:: This mode does not check whether table already contains rows from dataframe, so duplicated rows can be created. To implement deduplication, write data to staging table first, and then perform some deduplication logic using :obj:`~sql`. * Table exists and partitioned according :obj:`~partition_by`, but partition is present only in table, not dataframe Existing partition is left intact. -
replace_overlapping_partitionsOverwrites data in the existing partition, or create partition/table if it does not exist.Same as Spark's
df.write.insertInto(table, overwrite=True)+spark.sql.sources.partitionOverwriteMode=dynamic... DANGER::
This mode does make sense **ONLY** if the table is partitioned. **IF NOT, YOU'LL LOSE YOUR DATA!**.. dropdown:: Behavior in details
* Table does not exist Table is created using options provided by user (``format``, ``compression``, etc). * Table exists, but not partitioned, :obj:`~partition_by` is set Data is **overwritten in all the table**. Table is still not partitioned (DDL is unchanged). * Table exists and partitioned, but has different partitioning schema than :obj:`~partition_by` Partition is created based on table's ``PARTITIONED BY (...)`` options. Explicit :obj:`~partition_by` value is ignored. * Table exists and partitioned according :obj:`~partition_by`, but partition is present only in dataframe Partition is created. * Table exists and partitioned according :obj:`~partition_by`, partition is present in both dataframe and table Existing partition **replaced** with data from dataframe. * Table exists and partitioned according :obj:`~partition_by`, but partition is present only in table, not dataframe Existing partition is left intact. -
replace_entire_tableRecreates table (viaDROP + CREATE), deleting all existing data. All existing partitions are dropped.Same as Spark's
df.write.saveAsTable(table, mode="overwrite")(NOTinsertInto)!.. warning::
Table is recreated using options provided by user (``format``, ``compression``, etc) **instead of using original table options**. Be careful -
ignoreIgnores the write operation if the table/partition already exists... dropdown:: Behavior in details
* Table does not exist Table is created using options provided by user (``format``, ``compression``, etc). * Table exists If the table exists, **no further action is taken**. This is true whether or not new partition values are present and whether the partitioning scheme differs or not -
errorRaises an error if the table/partition already exists... dropdown:: Behavior in details
* Table does not exist Table is created using options provided by user (``format``, ``compression``, etc). * Table exists If the table exists, **raises an error**. This is true whether or not new partition values are present and whether the partitioning scheme differs or not
.. note::
Unlike using pure Spark, config option ``spark.sql.sources.partitionOverwriteMode``
does not affect behavior.
partition_by = Field(default=None, alias='partitionBy')
class-attribute
instance-attribute
List of columns should be used for data partitioning. None means partitioning is disabled.
Examples: reg_id or ["reg_id", "business_dt"]
.. warning::
Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``
sort_by = Field(default=None, alias='sortBy')
class-attribute
instance-attribute
Each file in a bucket will be sorted by these columns value. None means sorting is disabled.
Examples: user_id or ["user_id", "user_phone"]
.. note::
Sorting columns should contain values which are used in ``ORDER BY`` clauses.
.. warning::
Could be used only with :obj:`~bucket_by` option
.. warning::
Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``
table_properties = Field(default_factory=dict)
class-attribute
instance-attribute
TBLPROPERTIES to add to freshly created table.
.. versionadded:: 0.15.0
Examples: {"auto.purge": True}
.. warning::
Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``
parse(options)
classmethod
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised
Source code in onetl/impl/generic_options.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | |