Skip to content

Опции

Bases: FileDFWriteOptions, GenericOptions

Options for :obj:FileDFWriter <onetl.file.file_df_writer.file_df_writer.FileDFWriter>.

.. versionadded:: 0.9.0

Examples

.. note::

You can pass any value `supported by Spark <https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

The set of supported options depends on Spark version.

.. code:: python

from onetl.file import FileDFWriter

options = FileDFWriter.Options(
    if_exists="replace_overlapping_partitions",
    partitionBy="month",
)
Source code in onetl/file/file_df_writer/options.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
@support_hooks
class FileDFWriterOptions(FileDFWriteOptions, GenericOptions):
    """Options for :obj:`FileDFWriter <onetl.file.file_df_writer.file_df_writer.FileDFWriter>`.

    .. versionadded:: 0.9.0

    Examples
    --------

    .. note::

        You can pass any value `supported by Spark <https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html>`_,
        even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

        The set of supported options depends on Spark version.

    .. code:: python

        from onetl.file import FileDFWriter

        options = FileDFWriter.Options(
            if_exists="replace_overlapping_partitions",
            partitionBy="month",
        )
    """

    class Config:
        extra = "allow"
        prohibited_options = frozenset(("partitionOverwriteMode",))

    if_exists: FileDFExistBehavior = FileDFExistBehavior.APPEND
    """Behavior for existing target directory.

    If target directory does not exist, it will be created.
    But if it does exist, then behavior is different for each value.

    .. versionchanged:: 0.13.0

        Default value was changed from ``error`` to ``append``

    Possible values:
        * ``error``
            If folder already exists, raise an exception.

            Same as Spark's ``df.write.mode("error").save()``.

        * ``skip_entire_directory``
            If folder already exists, left existing files intact and stop immediately without any errors.

            Same as Spark's ``df.write.mode("ignore").save()``.

        * ``append`` (default)
            Appends data into existing directory.

            .. dropdown:: Behavior in details

                * Directory does not exist
                    Directory is created using all the provided options (``format``, ``partition_by``, etc).

                * Directory exists, does not contain partitions, but :obj:`~partition_by` is set
                    Data is appended to a directory, but to partitioned directory structure.

                    .. warning::

                        Existing files still present in the root of directory, but Spark will ignore those files while reading,
                        unless using ``recursive=True``.

                * Directory exists and contains partitions, but :obj:`~partition_by` is not set
                    Data is appended to a directory, but to the root of directory instead of nested partition directories.

                    .. warning::

                        Spark will ignore such files while reading, unless using ``recursive=True``.

                * Directory exists and contains partitions, but with different partitioning schema than :obj:`~partition_by`
                    Data is appended to a directory with new partitioning schema.

                    .. warning::

                        Spark cannot read directory with multiple partitioning schemas,
                        unless using ``recursive=True`` to disable partition scanning.

                * Directory exists and partitioned according :obj:`~partition_by`, but partition is present only in dataframe
                    New partition directory is created.

                * Directory exists and partitioned according :obj:`~partition_by`, partition is present in both dataframe and directory
                    New files are added to existing partition directory, existing files are sill present.

                * Directory exists and partitioned according :obj:`~partition_by`, but partition is present only in directory, not dataframe
                    Existing partition is left intact.

        * ``replace_overlapping_partitions``
            If partitions from dataframe already exist in directory structure, they will be overwritten.

            Same as Spark's ``df.write.option("partitionOverwriteMode", "dynamic").mode("overwrite").save()``.

            .. DANGER::

                This mode does make sense **ONLY** if the directory is partitioned.
                **IF NOT, YOU'LL LOOSE YOUR DATA!**

            .. dropdown:: Behavior in details

                * Directory does not exist
                    Directory is created using all the provided options (``format``, ``partition_by``, etc).

                * Directory exists, does not contain partitions, but :obj:`~partition_by` is set
                    Directory **will be deleted**, and will be created with partitions.

                * Directory exists and contains partitions, but :obj:`~partition_by` is not set
                    Directory **will be deleted**, and will be created with partitions.

                * Directory exists and contains partitions, but with different partitioning schema than :obj:`~partition_by`
                    Data is appended to a directory with new partitioning schema.

                    .. warning::

                        Spark cannot read directory with multiple partitioning schemas,
                        unless using ``recursive=True`` to disable partition scanning.

                * Directory exists and partitioned according :obj:`~partition_by`, but partition is present only in dataframe
                    New partition directory is created.

                * Directory exists and partitioned according :obj:`~partition_by`, partition is present in both dataframe and directory
                    Partition directory **will be deleted**, and new one is created with files containing data from dataframe.

                * Directory exists and partitioned according :obj:`~partition_by`, but partition is present only in directory, not dataframe
                    Existing partition is left intact.

        * ``replace_entire_directory``
            Remove existing directory and create new one, **overwriting all existing data**.
            **All existing partitions are dropped.**

            Same as Spark's ``df.write.option("partitionOverwriteMode", "static").mode("overwrite").save()``.

    .. note::

        Unlike using pure Spark, config option ``spark.sql.sources.partitionOverwriteMode``
        does not affect behavior of any ``mode``
    """

    partition_by: Optional[Union[List[str], str]] = Field(default=None, alias="partitionBy")
    """
    List of columns should be used for data partitioning. ``None`` means partitioning is disabled.

    Each partition is a folder which contains only files with the specific column value,
    like ``some.csv/col1=value1``, ``some.csv/col1=value2``, and so on.

    Multiple partitions columns means nested folder structure, like ``some.csv/col1=val1/col2=val2``.

    If ``WHERE`` clause in the query contains expression like ``partition = value``,
    Spark will scan only files in a specific partition.

    Examples: ``reg_id`` or ``["reg_id", "business_dt"]``

    .. note::

        Values should be scalars (integers, strings),
        and either static (``countryId``) or incrementing (dates, years), with low
        number of distinct values.

        Columns like ``userId`` or ``datetime``/``timestamp`` should **NOT** be used for partitioning.
    """

    @slot
    def apply_to_writer(self, writer: DataFrameWriter) -> DataFrameWriter:
        """
        Apply provided format to :obj:`pyspark.sql.DataFrameWriter`. |support_hooks|

        Returns
        -------
        :obj:`pyspark.sql.DataFrameWriter` with options applied
        """
        for method, value in self.dict(by_alias=True, exclude_none=True, exclude={"if_exists"}).items():
            # <value> is the arguments that will be passed to the <method>
            # format orc, parquet methods and format simultaneously
            if hasattr(writer, method):
                if isinstance(value, Iterable) and not isinstance(value, str):
                    writer = getattr(writer, method)(*value)  # noqa: WPS220
                else:
                    writer = getattr(writer, method)(value)  # noqa: WPS220
            else:
                writer = writer.option(method, value)

        if self.if_exists == FileDFExistBehavior.REPLACE_OVERLAPPING_PARTITIONS:
            writer = writer.mode("overwrite").option("partitionOverwriteMode", "dynamic")
        elif self.if_exists == FileDFExistBehavior.REPLACE_ENTIRE_DIRECTORY:
            writer = writer.mode("overwrite").option("partitionOverwriteMode", "static")
        elif self.if_exists == FileDFExistBehavior.SKIP_ENTIRE_DIRECTORY:
            writer = writer.mode("ignore")
        elif self.if_exists == FileDFExistBehavior.APPEND:
            writer = writer.mode("append")
        else:
            writer = writer.mode("error")

        return writer

    @root_validator(pre=True)
    def _mode_is_restricted(cls, values):
        if "mode" in values:
            raise ValueError("Parameter `mode` is not allowed. Please use `if_exists` parameter instead.")
        return values

    @root_validator
    def _partition_overwrite_mode_is_not_allowed(cls, values):
        partition_overwrite_mode = values.get("partitionOverwriteMode") or values.get("partition_overwrite_mode")
        if partition_overwrite_mode:
            if partition_overwrite_mode == "static":
                recommended_mode = "replace_entire_directory"
            else:
                recommended_mode = "replace_overlapping_partitions"

            raise ValueError(
                f"`partitionOverwriteMode` option should be replaced with if_exists='{recommended_mode}'",
            )
        return values

if_exists = FileDFExistBehavior.APPEND class-attribute instance-attribute

Behavior for existing target directory.

If target directory does not exist, it will be created. But if it does exist, then behavior is different for each value.

.. versionchanged:: 0.13.0

Default value was changed from ``error`` to ``append``
Possible values
  • error If folder already exists, raise an exception.

    Same as Spark's df.write.mode("error").save().

  • skip_entire_directory If folder already exists, left existing files intact and stop immediately without any errors.

    Same as Spark's df.write.mode("ignore").save().

  • append (default) Appends data into existing directory.

    .. dropdown:: Behavior in details

    * Directory does not exist
        Directory is created using all the provided options (``format``, ``partition_by``, etc).
    
    * Directory exists, does not contain partitions, but :obj:`~partition_by` is set
        Data is appended to a directory, but to partitioned directory structure.
    
        .. warning::
    
            Existing files still present in the root of directory, but Spark will ignore those files while reading,
            unless using ``recursive=True``.
    
    * Directory exists and contains partitions, but :obj:`~partition_by` is not set
        Data is appended to a directory, but to the root of directory instead of nested partition directories.
    
        .. warning::
    
            Spark will ignore such files while reading, unless using ``recursive=True``.
    
    * Directory exists and contains partitions, but with different partitioning schema than :obj:`~partition_by`
        Data is appended to a directory with new partitioning schema.
    
        .. warning::
    
            Spark cannot read directory with multiple partitioning schemas,
            unless using ``recursive=True`` to disable partition scanning.
    
    * Directory exists and partitioned according :obj:`~partition_by`, but partition is present only in dataframe
        New partition directory is created.
    
    * Directory exists and partitioned according :obj:`~partition_by`, partition is present in both dataframe and directory
        New files are added to existing partition directory, existing files are sill present.
    
    * Directory exists and partitioned according :obj:`~partition_by`, but partition is present only in directory, not dataframe
        Existing partition is left intact.
    
  • replace_overlapping_partitions If partitions from dataframe already exist in directory structure, they will be overwritten.

    Same as Spark's df.write.option("partitionOverwriteMode", "dynamic").mode("overwrite").save().

    .. DANGER::

    This mode does make sense **ONLY** if the directory is partitioned.
    **IF NOT, YOU'LL LOOSE YOUR DATA!**
    

    .. dropdown:: Behavior in details

    * Directory does not exist
        Directory is created using all the provided options (``format``, ``partition_by``, etc).
    
    * Directory exists, does not contain partitions, but :obj:`~partition_by` is set
        Directory **will be deleted**, and will be created with partitions.
    
    * Directory exists and contains partitions, but :obj:`~partition_by` is not set
        Directory **will be deleted**, and will be created with partitions.
    
    * Directory exists and contains partitions, but with different partitioning schema than :obj:`~partition_by`
        Data is appended to a directory with new partitioning schema.
    
        .. warning::
    
            Spark cannot read directory with multiple partitioning schemas,
            unless using ``recursive=True`` to disable partition scanning.
    
    * Directory exists and partitioned according :obj:`~partition_by`, but partition is present only in dataframe
        New partition directory is created.
    
    * Directory exists and partitioned according :obj:`~partition_by`, partition is present in both dataframe and directory
        Partition directory **will be deleted**, and new one is created with files containing data from dataframe.
    
    * Directory exists and partitioned according :obj:`~partition_by`, but partition is present only in directory, not dataframe
        Existing partition is left intact.
    
  • replace_entire_directory Remove existing directory and create new one, overwriting all existing data. All existing partitions are dropped.

    Same as Spark's df.write.option("partitionOverwriteMode", "static").mode("overwrite").save().

.. note::

Unlike using pure Spark, config option ``spark.sql.sources.partitionOverwriteMode``
does not affect behavior of any ``mode``

partition_by = Field(default=None, alias='partitionBy') class-attribute instance-attribute

List of columns should be used for data partitioning. None means partitioning is disabled.

Each partition is a folder which contains only files with the specific column value, like some.csv/col1=value1, some.csv/col1=value2, and so on.

Multiple partitions columns means nested folder structure, like some.csv/col1=val1/col2=val2.

If WHERE clause in the query contains expression like partition = value, Spark will scan only files in a specific partition.

Examples: reg_id or ["reg_id", "business_dt"]

.. note::

Values should be scalars (integers, strings),
and either static (``countryId``) or incrementing (dates, years), with low
number of distinct values.

Columns like ``userId`` or ``datetime``/``timestamp`` should **NOT** be used for partitioning.

apply_to_writer(writer)

Apply provided format to :obj:pyspark.sql.DataFrameWriter. |support_hooks|

Returns

:obj:pyspark.sql.DataFrameWriter with options applied

Source code in onetl/file/file_df_writer/options.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
@slot
def apply_to_writer(self, writer: DataFrameWriter) -> DataFrameWriter:
    """
    Apply provided format to :obj:`pyspark.sql.DataFrameWriter`. |support_hooks|

    Returns
    -------
    :obj:`pyspark.sql.DataFrameWriter` with options applied
    """
    for method, value in self.dict(by_alias=True, exclude_none=True, exclude={"if_exists"}).items():
        # <value> is the arguments that will be passed to the <method>
        # format orc, parquet methods and format simultaneously
        if hasattr(writer, method):
            if isinstance(value, Iterable) and not isinstance(value, str):
                writer = getattr(writer, method)(*value)  # noqa: WPS220
            else:
                writer = getattr(writer, method)(value)  # noqa: WPS220
        else:
            writer = writer.option(method, value)

    if self.if_exists == FileDFExistBehavior.REPLACE_OVERLAPPING_PARTITIONS:
        writer = writer.mode("overwrite").option("partitionOverwriteMode", "dynamic")
    elif self.if_exists == FileDFExistBehavior.REPLACE_ENTIRE_DIRECTORY:
        writer = writer.mode("overwrite").option("partitionOverwriteMode", "static")
    elif self.if_exists == FileDFExistBehavior.SKIP_ENTIRE_DIRECTORY:
        writer = writer.mode("ignore")
    elif self.if_exists == FileDFExistBehavior.APPEND:
        writer = writer.mode("append")
    else:
        writer = writer.mode("error")

    return writer