Skip to content

Запись данных в Hive с помощью DBWriter

Для записи данных в Hive используйте DBWriter.

Примеры

```python
    from onetl.connection import Hive
    from onetl.db import DBWriter

    hive = Hive(...)

    df = ...  # данные находятся здесь

    # Создаем датафрейм с определенным количеством партиций Spark.
    # Используем колонки партиционирования Hive для группировки данных. Создаем максимум 20 файлов на каждую партицию Hive.
    # Также сортируем данные по колонке, с которой наиболее коррелируют данные (например, user_id), уменьшая размер файлов.

    num_files_per_partition = 20
    partition_columns = ["country", "business_date"]
    sort_columns = ["user_id"]
    write_df = df.repartition(
        num_files_per_partition,
        *partition_columns,
        *sort_columns,
    ).sortWithinPartitions(*partition_columns, *sort_columns)

    writer = DBWriter(
        connection=hive,
        target="schema.table",
        options=Hive.WriteOptions(
            if_exists="append",
            # Колонки партиционирования Hive.
            partitionBy=partition_columns,
        ),
    )

    writer.run(write_df)
```

Рекомендации

Используйте колоночные форматы записи

Предпочтительные форматы записи:

Warning

При использовании DBWriter формат данных spark по умолчанию, настроенный в spark.sql.sources.default, игнорируется, так как значение по умолчанию для Hive.WriteOptions(format=...) явно установлено на orc.

В колоночных форматах записи каждый файл содержит отдельные секции, где хранятся данные столбцов. Подвал файла содержит информацию о расположении каждой секции/группы столбцов. Spark может использовать эту информацию для загрузки только тех секций, которые требуются для конкретного запроса, например, только выбранных столбцов, чтобы значительно ускорить выполнение запроса.

Еще одно преимущество — высокий коэффициент сжатия, например, в 10-100 раз по сравнению с JSON или CSV.

Используйте партиционирование

Как это работает

Hive поддерживает разделение данных на партиции, которые представляют собой разные каталоги в файловой системе с именами типа some_col=value1/another_col=value2.

Например, датафрейм с содержимым:

country: string business_date: date user_id: int bytes: long
RU 2024-01-01 1234 25325253525
RU 2024-01-01 2345 23234535243
RU 2024-01-02 1234 62346634564
US 2024-01-01 5678 4252345354
US 2024-01-02 5678 5474575745
US 2024-01-03 5678 3464574567

С параметром partitionBy=["country", "business_dt"] данные будут храниться в виде файлов в следующих подкаталогах:

  • /country=RU/business_date=2024-01-01/
  • /country=RU/business_date=2024-01-02/
  • /country=US/business_date=2024-01-01/
  • /country=US/business_date=2024-01-02/
  • /country=US/business_date=2024-01-03/

Отдельный подкаталог создается для каждой уникальной комбинации значений столбцов в датафрейме.

Пожалуйста, не путайте партиции датафрейма Spark (т.е. партии данных, обрабатываемые исполнителями Spark, обычно параллельно) и партиционирование Hive (хранение данных в разных подкаталогах). Количество партиций датафрейма Spark коррелирует с количеством файлов, создаваемых в каждой партиции Hive. Например, датафрейм Spark с 10 партициями и 5 различными значениями столбцов партиции Hive будет сохранен как 5 подкаталогов по 10 файлов в каждом = всего 50 файлов. Без партиционирования Hive все файлы размещаются в одном плоском каталоге.

Но зачем?

Запросы, которые имеют условие WHERE с фильтрами по столбцам партиционирования Hive, например WHERE country = 'RU' AND business_date='2024-01-01', будут считывать только файлы из этой конкретной партиции, например /country=RU/business_date=2024-01-01/, и пропускать файлы из других партиций.

Это значительно повышает производительность и снижает объем памяти, используемой Spark. Рекомендуется использовать партиционирование Hive во всех таблицах.

Какие колонки следует использовать?

Обычно столбцы партиционирования Hive основываются на дате события или местоположении, например country: string, business_date: date, run_date: date и т. д.

Столбцы партиций должны содержать данные с низкой кардинальностью. Даты, небольшие целые числа, строки с малым количеством возможных значений подходят. Но временные метки, числа с плавающей точкой, десятичные числа, длинные целые (например, идентификатор пользователя), строки с большим количеством уникальных значений (например, имя пользователя или email) НЕ должны использоваться в качестве столбцов партиционирования Hive. В отличие от некоторых других баз данных, партиции на основе диапазонов и хэш-функций не поддерживаются.

Столбец партиции должен быть частью датафрейма. Если вы хотите разбить значения по компоненту даты столбца business_dt: timestamp, добавьте новый столбец в датафрейм следующим образом: df.withColumn("business_date", date(df.business_dt)).

Используйте сжатие

Использование алгоритмов сжатия, таких как snappy, lz4 или zstd, может уменьшить размер файлов (до 10 раз).

Предпочитайте создавать большие файлы

HDFS и S3 не предназначены для хранения миллионов маленьких файлов. Минимальный размер файла должен быть не менее 10Мб, но обычно он составляет 128Мб+ или 256Мб+ (размер блока HDFS). НИКОГДА не создавайте файлы размером в несколько килобайт.

Количество файлов может различаться в разных случаях. С одной стороны, Spark Adaptive Query Execution (AQE) может объединять маленькие партиции датафрейма Spark в одну большую. С другой стороны, датафреймы с данными, в которых есть перекос могут создавать больше файлов, чем ожидалось.

Для создания небольшого количества больших файлов можно уменьшить количество партиций датафрейма Spark. Используйте функцию df.repartition(N, columns...), например: df.repartition(20, "col1", "col2"). Это создаст новый датафрейм Spark с партициями, используя выражение hash(df.col1 + df.col2) mod 20, избегая перекоса данных.

Примечание: большие партиции датафрейма требуют больше ресурсов (CPU, RAM) на исполнителе Spark. Точное количество партиций должно определяться эмпирически, так как оно зависит от объема данных и доступных ресурсов.

Сортируйте данные перед записью

Датафрейм с отсортированным содержимым:

country: string business_date: date user_id: int business_dt: timestamp bytes: long
RU 2024-01-01 1234 2024-01-01T11:22:33.456 25325253525
RU 2024-01-01 1234 2024-01-01T12:23:44.567 25325253525
RU 2024-01-02 1234 2024-01-01T13:25:56.789 34335645635
US 2024-01-01 2345 2024-01-01T10:00:00.000 12341
US 2024-01-02 2345 2024-01-01T15:11:22.345 13435
US 2024-01-03 2345 2024-01-01T20:22:33.567 14564

Имеет гораздо лучший коэффициент сжатия, чем несортированный, например, в 2 или даже больше раз:

country: string business_date: date user_id: int business_dt: timestamp bytes: long
RU 2024-01-01 1234 2024-01-01T11:22:33.456 25325253525
RU 2024-01-01 6345 2024-12-01T23:03:44.567 25365
RU 2024-01-02 5234 2024-07-01T06:10:56.789 45643456747
US 2024-01-01 4582 2024-04-01T17:59:00.000 362546475
US 2024-01-02 2345 2024-09-01T04:24:22.345 3235
US 2024-01-03 3575 2024-03-01T21:37:33.567 346345764

Выбор столбцов для сортировки данных действительно зависит от самих данных. Если данные коррелируют с каким-то конкретным столбцом, как в примере выше, где объем трафика коррелирует как с user_id, так и с timestamp, используйте df.sortWithinPartitions("user_id", "timestamp") перед записью данных.

Если df.repartition(N, repartition_columns...) используется в сочетании с df.sortWithinPartitions(sort_columns...), то sort_columns должны начинаться с repartition_columns или быть равными им.

Опции

onetl.connection.db_connection.hive.options.HiveWriteOptions

Bases: GenericOptions

Hive source writing options.

You can pass here key-value items which then will be converted to calls of :obj:pyspark.sql.readwriter.DataFrameWriter methods.

For example, Hive.WriteOptions(if_exists="append", partitionBy="reg_id") will be converted to df.write.mode("append").partitionBy("reg_id") call, and so on.

Examples

.. note::

You can pass any method name and its value
`supported by Spark <https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

The set of supported options depends on Spark version used.

.. code:: python

from onetl.connection import Hive

options = Hive.WriteOptions(
    if_exists="append",
    partitionBy="reg_id",
    customSparkOption="value",
)
Source code in onetl/connection/db_connection/hive/options.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
class HiveWriteOptions(GenericOptions):
    """Hive source writing options.

    You can pass here key-value items which then will be converted to calls
    of :obj:`pyspark.sql.readwriter.DataFrameWriter` methods.

    For example, ``Hive.WriteOptions(if_exists="append", partitionBy="reg_id")`` will
    be converted to ``df.write.mode("append").partitionBy("reg_id")`` call, and so on.

    Examples
    --------

    .. note::

        You can pass any method name and its value
        `supported by Spark <https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html>`_,
        even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

        The set of supported options depends on Spark version used.

    .. code:: python

        from onetl.connection import Hive

        options = Hive.WriteOptions(
            if_exists="append",
            partitionBy="reg_id",
            customSparkOption="value",
        )
    """

    class Config:
        extra = "allow"
        known_options: frozenset = frozenset()

    if_exists: HiveTableExistBehavior = Field(  # type: ignore[literal-required]
        default=HiveTableExistBehavior.APPEND,
        alias=avoid_alias("mode"),
    )
    """Behavior of writing data into existing table.

    Possible values:
        * ``append`` (default)
            Appends data into existing partition/table, or create partition/table if it does not exist.

            Same as Spark's ``df.write.insertInto(table, overwrite=False)``.

            .. dropdown:: Behavior in details

                * Table does not exist
                    Table is created using options provided by user (``format``, ``compression``, etc).

                * Table exists, but not partitioned, :obj:`~partition_by` is set
                    Data is appended to a table. Table is still not partitioned (DDL is unchanged).

                * Table exists and partitioned, but has different partitioning schema than :obj:`~partition_by`
                    Partition is created based on table's ``PARTITIONED BY (...)`` options.
                    Explicit :obj:`~partition_by` value is ignored.

                * Table exists and partitioned according :obj:`~partition_by`, but partition is present only in dataframe
                    Partition is created.

                * Table exists and partitioned according :obj:`~partition_by`, partition is present in both dataframe and table
                    Data is appended to existing partition.

                    .. warning::

                        This mode does not check whether table already contains
                        rows from dataframe, so duplicated rows can be created.

                        To implement deduplication, write data to staging table first,
                        and then perform some deduplication logic using :obj:`~sql`.

                * Table exists and partitioned according :obj:`~partition_by`, but partition is present only in table, not dataframe
                    Existing partition is left intact.

        * ``replace_overlapping_partitions``
            Overwrites data in the existing partition, or create partition/table if it does not exist.

            Same as Spark's ``df.write.insertInto(table, overwrite=True)`` +
            ``spark.sql.sources.partitionOverwriteMode=dynamic``.

            .. DANGER::

                This mode does make sense **ONLY** if the table is partitioned.
                **IF NOT, YOU'LL LOSE YOUR DATA!**

            .. dropdown:: Behavior in details

                * Table does not exist
                    Table is created using options provided by user (``format``, ``compression``, etc).

                * Table exists, but not partitioned, :obj:`~partition_by` is set
                    Data is **overwritten in all the table**. Table is still not partitioned (DDL is unchanged).

                * Table exists and partitioned, but has different partitioning schema than :obj:`~partition_by`
                    Partition is created based on table's ``PARTITIONED BY (...)`` options.
                    Explicit :obj:`~partition_by` value is ignored.

                * Table exists and partitioned according :obj:`~partition_by`, but partition is present only in dataframe
                    Partition is created.

                * Table exists and partitioned according :obj:`~partition_by`, partition is present in both dataframe and table
                    Existing partition **replaced** with data from dataframe.

                * Table exists and partitioned according :obj:`~partition_by`, but partition is present only in table, not dataframe
                    Existing partition is left intact.

        * ``replace_entire_table``
            **Recreates table** (via ``DROP + CREATE``), **deleting all existing data**.
            **All existing partitions are dropped.**

            Same as Spark's ``df.write.saveAsTable(table, mode="overwrite")`` (NOT ``insertInto``)!

            .. warning::

                Table is recreated using options provided by user (``format``, ``compression``, etc)
                **instead of using original table options**. Be careful

        * ``ignore``
            Ignores the write operation if the table/partition already exists.

            .. dropdown:: Behavior in details

                * Table does not exist
                    Table is created using options provided by user (``format``, ``compression``, etc).

                * Table exists
                    If the table exists, **no further action is taken**. This is true whether or not new partition
                    values are present and whether the partitioning scheme differs or not

        * ``error``
            Raises an error if the table/partition already exists.

            .. dropdown:: Behavior in details

                * Table does not exist
                    Table is created using options provided by user (``format``, ``compression``, etc).

                * Table exists
                    If the table exists, **raises an error**. This is true whether or not new partition
                    values are present and whether the partitioning scheme differs or not


    .. note::

        Unlike using pure Spark, config option ``spark.sql.sources.partitionOverwriteMode``
        does not affect behavior.
    """

    format: Union[str, BaseWritableFileFormat] = "orc"
    """Format of files which should be used for storing table data.

    Examples
    --------

    - string format: ``"orc"`` (default), ``"parquet"``, ``"csv"`` (NOT recommended).
    - format class instance: ``ORC(compression="snappy")``, ``Parquet()``, ``CSV(header=True, delimiter=",")``.

    .. code::

        options = Hive.WriteOptions(
            if_exists="append",
            partitionBy="reg_id",
            format="orc",
        )

        # or using an ORC format class instance:

        from onetl.file.format import ORC

        options = Hive.WriteOptions(
            if_exists="append",
            partitionBy="reg_id",
            format=ORC(compression="snappy"),
        )

    .. note::

        It's better to use column-based formats like ``orc`` or ``parquet``,
        not row-based (``csv``, ``json``)

    .. warning::

        Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``
    """

    partition_by: Optional[Union[List[str], str]] = Field(default=None, alias="partitionBy")
    """
    List of columns should be used for data partitioning. ``None`` means partitioning is disabled.

    Examples: ``reg_id`` or ``["reg_id", "business_dt"]``

    .. warning::

        Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``
    """

    bucket_by: Optional[Tuple[int, Union[List[str], str]]] = Field(default=None, alias="bucketBy")  # noqa: WPS234
    """Number of buckets plus bucketing columns. ``None`` means bucketing is disabled.

    Each bucket is created as a set of files with name containing result of calculation ``hash(columns) mod num_buckets``.

    This allows to remove shuffle from queries containing ``GROUP BY`` or ``JOIN`` or using ``=`` / ``IN`` predicates
    on specific columns.

    Examples: ``(10, "user_id")``, ``(10, ["user_id", "user_phone"])``

    .. note::

        Bucketing should be used on columns containing a lot of unique values,
        like ``userId``.

        Columns like ``date`` should **NOT** be used for bucketing
        because of too low number of unique values.

    .. warning::

        It is recommended to use this option **ONLY** if you have a large table
        (hundreds of Gb or more), which is used mostly for JOINs with other tables,
        and you're inserting data using ``if_exists=overwrite_partitions`` or ``if_exists=replace_entire_table``.

        Otherwise Spark will create a lot of small files
        (one file for each bucket and each executor), drastically **decreasing** HDFS performance.

    .. warning::

        Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``
    """

    sort_by: Optional[Union[List[str], str]] = Field(default=None, alias="sortBy")
    """Each file in a bucket will be sorted by these columns value. ``None`` means sorting is disabled.

    Examples: ``user_id`` or ``["user_id", "user_phone"]``

    .. note::

        Sorting columns should contain values which are used in ``ORDER BY`` clauses.

    .. warning::

        Could be used only with :obj:`~bucket_by` option

    .. warning::

        Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``
    """

    compression: Optional[str] = None
    """Compressing algorithm which should be used for compressing created files in HDFS.
    ``None`` means compression is disabled.

    Examples: ``snappy``, ``zlib``

    .. warning::

        Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``
    """

    table_properties: Dict[str, Any] = Field(default_factory=dict)
    """TBLPROPERTIES to add to freshly created table.

    .. versionadded:: 0.15.0

    Examples: ``{"auto.purge": True}``

    .. warning::

        Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``
    """

    @validator("sort_by")
    def _sort_by_cannot_be_used_without_bucket_by(cls, sort_by, values):
        options = values.copy()
        bucket_by = options.pop("bucket_by", None)
        if sort_by and not bucket_by:
            raise ValueError("`sort_by` option can only be used with non-empty `bucket_by`")

        return sort_by

    @root_validator
    def _partition_overwrite_mode_is_not_allowed(cls, values):
        partition_overwrite_mode = values.get("partitionOverwriteMode") or values.get("partition_overwrite_mode")
        if partition_overwrite_mode:
            if partition_overwrite_mode == "static":
                recommend_mode = "replace_entire_table"
            else:
                recommend_mode = "replace_overlapping_partitions"
            raise ValueError(
                f"`partitionOverwriteMode` option should be replaced with if_exists='{recommend_mode}'",
            )

        if values.get("insert_into") is not None or values.get("insertInto") is not None:
            raise ValueError(
                "`insertInto` option was removed in onETL 0.4.0, "
                "now df.write.insertInto or df.write.saveAsTable is selected based on table existence",
            )

        return values

    @root_validator(pre=True)
    def _mode_is_deprecated(cls, values):
        if "mode" in values:
            warnings.warn(
                "Option `Hive.WriteOptions(mode=...)` is deprecated since v0.9.0 and will be removed in v1.0.0. "
                "Use `Hive.WriteOptions(if_exists=...)` instead",
                category=UserWarning,
                stacklevel=3,
            )
        return values

bucket_by = Field(default=None, alias='bucketBy') class-attribute instance-attribute

Number of buckets plus bucketing columns. None means bucketing is disabled.

Each bucket is created as a set of files with name containing result of calculation hash(columns) mod num_buckets.

This allows to remove shuffle from queries containing GROUP BY or JOIN or using = / IN predicates on specific columns.

Examples: (10, "user_id"), (10, ["user_id", "user_phone"])

.. note::

Bucketing should be used on columns containing a lot of unique values,
like ``userId``.

Columns like ``date`` should **NOT** be used for bucketing
because of too low number of unique values.

.. warning::

It is recommended to use this option **ONLY** if you have a large table
(hundreds of Gb or more), which is used mostly for JOINs with other tables,
and you're inserting data using ``if_exists=overwrite_partitions`` or ``if_exists=replace_entire_table``.

Otherwise Spark will create a lot of small files
(one file for each bucket and each executor), drastically **decreasing** HDFS performance.

.. warning::

Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``

compression = None class-attribute instance-attribute

Compressing algorithm which should be used for compressing created files in HDFS. None means compression is disabled.

Examples: snappy, zlib

.. warning::

Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``

format = 'orc' class-attribute instance-attribute

Format of files which should be used for storing table data.

Examples
  • string format: "orc" (default), "parquet", "csv" (NOT recommended).
  • format class instance: ORC(compression="snappy"), Parquet(), CSV(header=True, delimiter=",").

.. code::

options = Hive.WriteOptions(
    if_exists="append",
    partitionBy="reg_id",
    format="orc",
)

# or using an ORC format class instance:

from onetl.file.format import ORC

options = Hive.WriteOptions(
    if_exists="append",
    partitionBy="reg_id",
    format=ORC(compression="snappy"),
)

.. note::

It's better to use column-based formats like ``orc`` or ``parquet``,
not row-based (``csv``, ``json``)

.. warning::

Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``

if_exists = Field(default=(HiveTableExistBehavior.APPEND), alias=(avoid_alias('mode'))) class-attribute instance-attribute

Behavior of writing data into existing table.

Possible values
  • append (default) Appends data into existing partition/table, or create partition/table if it does not exist.

    Same as Spark's df.write.insertInto(table, overwrite=False).

    .. dropdown:: Behavior in details

    * Table does not exist
        Table is created using options provided by user (``format``, ``compression``, etc).
    
    * Table exists, but not partitioned, :obj:`~partition_by` is set
        Data is appended to a table. Table is still not partitioned (DDL is unchanged).
    
    * Table exists and partitioned, but has different partitioning schema than :obj:`~partition_by`
        Partition is created based on table's ``PARTITIONED BY (...)`` options.
        Explicit :obj:`~partition_by` value is ignored.
    
    * Table exists and partitioned according :obj:`~partition_by`, but partition is present only in dataframe
        Partition is created.
    
    * Table exists and partitioned according :obj:`~partition_by`, partition is present in both dataframe and table
        Data is appended to existing partition.
    
        .. warning::
    
            This mode does not check whether table already contains
            rows from dataframe, so duplicated rows can be created.
    
            To implement deduplication, write data to staging table first,
            and then perform some deduplication logic using :obj:`~sql`.
    
    * Table exists and partitioned according :obj:`~partition_by`, but partition is present only in table, not dataframe
        Existing partition is left intact.
    
  • replace_overlapping_partitions Overwrites data in the existing partition, or create partition/table if it does not exist.

    Same as Spark's df.write.insertInto(table, overwrite=True) + spark.sql.sources.partitionOverwriteMode=dynamic.

    .. DANGER::

    This mode does make sense **ONLY** if the table is partitioned.
    **IF NOT, YOU'LL LOSE YOUR DATA!**
    

    .. dropdown:: Behavior in details

    * Table does not exist
        Table is created using options provided by user (``format``, ``compression``, etc).
    
    * Table exists, but not partitioned, :obj:`~partition_by` is set
        Data is **overwritten in all the table**. Table is still not partitioned (DDL is unchanged).
    
    * Table exists and partitioned, but has different partitioning schema than :obj:`~partition_by`
        Partition is created based on table's ``PARTITIONED BY (...)`` options.
        Explicit :obj:`~partition_by` value is ignored.
    
    * Table exists and partitioned according :obj:`~partition_by`, but partition is present only in dataframe
        Partition is created.
    
    * Table exists and partitioned according :obj:`~partition_by`, partition is present in both dataframe and table
        Existing partition **replaced** with data from dataframe.
    
    * Table exists and partitioned according :obj:`~partition_by`, but partition is present only in table, not dataframe
        Existing partition is left intact.
    
  • replace_entire_table Recreates table (via DROP + CREATE), deleting all existing data. All existing partitions are dropped.

    Same as Spark's df.write.saveAsTable(table, mode="overwrite") (NOT insertInto)!

    .. warning::

    Table is recreated using options provided by user (``format``, ``compression``, etc)
    **instead of using original table options**. Be careful
    
  • ignore Ignores the write operation if the table/partition already exists.

    .. dropdown:: Behavior in details

    * Table does not exist
        Table is created using options provided by user (``format``, ``compression``, etc).
    
    * Table exists
        If the table exists, **no further action is taken**. This is true whether or not new partition
        values are present and whether the partitioning scheme differs or not
    
  • error Raises an error if the table/partition already exists.

    .. dropdown:: Behavior in details

    * Table does not exist
        Table is created using options provided by user (``format``, ``compression``, etc).
    
    * Table exists
        If the table exists, **raises an error**. This is true whether or not new partition
        values are present and whether the partitioning scheme differs or not
    

.. note::

Unlike using pure Spark, config option ``spark.sql.sources.partitionOverwriteMode``
does not affect behavior.

partition_by = Field(default=None, alias='partitionBy') class-attribute instance-attribute

List of columns should be used for data partitioning. None means partitioning is disabled.

Examples: reg_id or ["reg_id", "business_dt"]

.. warning::

Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``

sort_by = Field(default=None, alias='sortBy') class-attribute instance-attribute

Each file in a bucket will be sorted by these columns value. None means sorting is disabled.

Examples: user_id or ["user_id", "user_phone"]

.. note::

Sorting columns should contain values which are used in ``ORDER BY`` clauses.

.. warning::

Could be used only with :obj:`~bucket_by` option

.. warning::

Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``

table_properties = Field(default_factory=dict) class-attribute instance-attribute

TBLPROPERTIES to add to freshly created table.

.. versionadded:: 0.15.0

Examples: {"auto.purge": True}

.. warning::

Used **only** while **creating new table**, or in case of ``if_exists=replace_entire_table``

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised

Source code in onetl/impl/generic_options.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def parse(
    cls: type[T],
    options: GenericOptions | dict | None,
) -> T:
    """
    If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
    If a Dict object was passed it will be converted to ReadOptions.

    Otherwise, an exception will be raised
    """

    if not options:
        return cls()

    if isinstance(options, dict):
        return cls.parse_obj(options)

    if not isinstance(options, cls):
        raise TypeError(
            f"{options.__class__.__name__} is not a {cls.__name__} instance",
        )

    return options