Skip to content

Опции

Bases: FileDFWriteOptions, GenericOptions

Options for FileDFWriter.

Added in 0.9.0

Examples

Note

You can pass any value supported by Spark, even if it is not mentioned in this documentation. Option names should be in camelCase!

The set of supported options depends on Spark version.

from onetl.file import FileDFWriter

options = FileDFWriter.Options(
    if_exists="replace_overlapping_partitions",
    partitionBy="month",
)
Source code in onetl/file/file_df_writer/options.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
@support_hooks
class FileDFWriterOptions(FileDFWriteOptions, GenericOptions):
    """Options for [FileDFWriter][onetl.file.file_df_writer.file_df_writer.FileDFWriter].

    !!! success "Added in 0.9.0"

    Examples
    --------

    !!! note

        You can pass any value [supported by Spark](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html),
        even if it is not mentioned in this documentation. **Option names should be in** `camelCase`!

        The set of supported options depends on Spark version.

    ```python
    from onetl.file import FileDFWriter

    options = FileDFWriter.Options(
        if_exists="replace_overlapping_partitions",
        partitionBy="month",
    )
    ```
    """

    class Config:
        extra = "allow"
        prohibited_options = frozenset(("partitionOverwriteMode",))

    if_exists: FileDFExistBehavior = FileDFExistBehavior.APPEND
    """Behavior for existing target directory.

    If target directory does not exist, it will be created.
    But if it does exist, then behavior is different for each value.

    !!! info "Changed in 0.13.0"

        Default value was changed from `error` to `append`

    Possible values:
        * `error`
            If folder already exists, raise an exception.

            Same as Spark's `df.write.mode("error").save()`.

        * `skip_entire_directory`
            If folder already exists, left existing files intact and stop immediately without any errors.

            Same as Spark's `df.write.mode("ignore").save()`.

        * `append` (default)
            Appends data into existing directory.

            ??? note "Behavior in details"

                * Directory does not exist
                    Directory is created using all the provided options (`format`, `partition_by`, etc).

                * Directory exists, does not contain partitions, but [partition_by][] is set
                    Data is appended to a directory, but to partitioned directory structure.

                    !!! warning

                        Existing files still present in the root of directory,
                        but Spark will ignore those files while reading,
                        unless using `recursive=True`.

                * Directory exists and contains partitions, but [partition_by][] is not set
                    Data is appended to a directory, but to the root of
                    directory instead of nested partition directories.

                    !!! warning

                        Spark will ignore such files while reading, unless using `recursive=True`.

                * Directory exists and contains partitions,
                  but with different partitioning schema than [partition_by][]
                    Data is appended to a directory with new partitioning schema.

                    !!! warning

                        Spark cannot read directory with multiple partitioning schemas,
                        unless using `recursive=True` to disable partition scanning.

                * Directory exists and partitioned according [partition_by][],
                  but partition is present only in dataframe
                    New partition directory is created.

                * Directory exists and partitioned according [partition_by][],
                  partition is present in both dataframe and directory
                    New files are added to existing partition directory, existing files are sill present.

                * Directory exists and partitioned according [partition_by][],
                  but partition is present only in directory, not dataframe
                    Existing partition is left intact.

        * `replace_overlapping_partitions`
            If partitions from dataframe already exist in directory structure, they will be overwritten.

            Same as Spark's `df.write.option("partitionOverwriteMode", "dynamic").mode("overwrite").save()`.

            !!! danger

                This mode does make sense **ONLY** if the directory is partitioned.
                **IF NOT, YOU'LL LOOSE YOUR DATA!**

            ??? note "Behavior in details"

                * Directory does not exist
                    Directory is created using all the provided options
                    (`format`, `partition_by`, etc).

                * Directory exists, does not contain partitions, but [partition_by][] is set
                    Directory **will be deleted**, and will be created with partitions.

                * Directory exists and contains partitions, but [partition_by][] is not set
                    Directory **will be deleted**, and will be created with partitions.

                * Directory exists and contains partitions,
                  but with different partitioning schema than [partition_by][]
                    Data is appended to a directory with new partitioning schema.

                    !!! warning

                        Spark cannot read directory with multiple partitioning schemas,
                        unless using `recursive=True` to disable partition scanning.

                * Directory exists and partitioned according [partition_by][],
                  but partition is present only in dataframe
                    New partition directory is created.

                * Directory exists and partitioned according [partition_by][],
                  partition is present in both dataframe and directory
                    Partition directory **will be deleted**,
                    and new one is created with files containing data from dataframe.

                * Directory exists and partitioned according [partition_by][],
                  but partition is present only in directory, not dataframe
                    Existing partition is left intact.

        * `replace_entire_directory`
            Remove existing directory and create new one, **overwriting all existing data**.
            **All existing partitions are dropped.**

            Same as Spark's `df.write.option("partitionOverwriteMode", "static").mode("overwrite").save()`.

    !!! note

        Unlike using pure Spark, config option `spark.sql.sources.partitionOverwriteMode`
        does not affect behavior of any `mode`
    """

    partition_by: Optional[Union[List[str], str]] = Field(default=None, alias="partitionBy")
    """
    List of columns should be used for data partitioning. `None` means partitioning is disabled.

    Each partition is a folder which contains only files with the specific column value,
    like `some.csv/col1=value1`, `some.csv/col1=value2`, and so on.

    Multiple partitions columns means nested folder structure, like `some.csv/col1=val1/col2=val2`.

    If `WHERE` clause in the query contains expression like `partition = value`,
    Spark will scan only files in a specific partition.

    Examples: `reg_id` or `["reg_id", "business_dt"]`

    !!! note

        Values should be scalars (integers, strings),
        and either static (`countryId`) or incrementing (dates, years), with low
        number of distinct values.

        Columns like `userId` or `datetime`/`timestamp` should **NOT** be used for partitioning.
    """

    @slot
    def apply_to_writer(self, writer: DataFrameWriter) -> DataFrameWriter:
        """
        Apply provided format to `pyspark.sql.DataFrameWriter`. [![support hooks](https://img.shields.io/badge/%20-support%20hooks-blue)](/hooks/)

        Returns
        -------
        pyspark.sql.DataFrameWriter
            Writer with options applied.
        """
        for method, value in self.dict(by_alias=True, exclude_none=True, exclude={"if_exists"}).items():
            # <value> is the arguments that will be passed to the <method>
            # format orc, parquet methods and format simultaneously
            if hasattr(writer, method):
                if isinstance(value, Iterable) and not isinstance(value, str):
                    writer = getattr(writer, method)(*value)
                else:
                    writer = getattr(writer, method)(value)
            else:
                writer = writer.option(method, value)

        if self.if_exists == FileDFExistBehavior.REPLACE_OVERLAPPING_PARTITIONS:
            writer = writer.mode("overwrite").option("partitionOverwriteMode", "dynamic")
        elif self.if_exists == FileDFExistBehavior.REPLACE_ENTIRE_DIRECTORY:
            writer = writer.mode("overwrite").option("partitionOverwriteMode", "static")
        elif self.if_exists == FileDFExistBehavior.SKIP_ENTIRE_DIRECTORY:
            writer = writer.mode("ignore")
        elif self.if_exists == FileDFExistBehavior.APPEND:
            writer = writer.mode("append")
        else:
            writer = writer.mode("error")

        return writer

    @root_validator(pre=True)
    def _mode_is_restricted(cls, values):
        if "mode" in values:
            msg = "Parameter `mode` is not allowed. Please use `if_exists` parameter instead."
            raise ValueError(msg)
        return values

    @root_validator
    def _partition_overwrite_mode_is_not_allowed(cls, values):
        partition_overwrite_mode = values.get("partitionOverwriteMode") or values.get("partition_overwrite_mode")
        if partition_overwrite_mode:
            if partition_overwrite_mode == "static":
                recommended_mode = "replace_entire_directory"
            else:
                recommended_mode = "replace_overlapping_partitions"

            msg = f"`partitionOverwriteMode` option should be replaced with if_exists='{recommended_mode}'"
            raise ValueError(msg)
        return values

if_exists = FileDFExistBehavior.APPEND class-attribute instance-attribute

Behavior for existing target directory.

If target directory does not exist, it will be created. But if it does exist, then behavior is different for each value.

Changed in 0.13.0

Default value was changed from error to append

Possible values
  • error If folder already exists, raise an exception.

    Same as Spark's df.write.mode("error").save().

  • skip_entire_directory If folder already exists, left existing files intact and stop immediately without any errors.

    Same as Spark's df.write.mode("ignore").save().

  • append (default) Appends data into existing directory.

    Behavior in details
    • Directory does not exist Directory is created using all the provided options (format, partition_by, etc).

    • Directory exists, does not contain partitions, but [partition_by][] is set Data is appended to a directory, but to partitioned directory structure.

      Warning

      Existing files still present in the root of directory, but Spark will ignore those files while reading, unless using recursive=True.

    • Directory exists and contains partitions, but [partition_by][] is not set Data is appended to a directory, but to the root of directory instead of nested partition directories.

      Warning

      Spark will ignore such files while reading, unless using recursive=True.

    • Directory exists and contains partitions, but with different partitioning schema than [partition_by][] Data is appended to a directory with new partitioning schema.

      Warning

      Spark cannot read directory with multiple partitioning schemas, unless using recursive=True to disable partition scanning.

    • Directory exists and partitioned according [partition_by][], but partition is present only in dataframe New partition directory is created.

    • Directory exists and partitioned according [partition_by][], partition is present in both dataframe and directory New files are added to existing partition directory, existing files are sill present.

    • Directory exists and partitioned according [partition_by][], but partition is present only in directory, not dataframe Existing partition is left intact.

  • replace_overlapping_partitions If partitions from dataframe already exist in directory structure, they will be overwritten.

    Same as Spark's df.write.option("partitionOverwriteMode", "dynamic").mode("overwrite").save().

    Danger

    This mode does make sense ONLY if the directory is partitioned. IF NOT, YOU'LL LOOSE YOUR DATA!

    Behavior in details
    • Directory does not exist Directory is created using all the provided options (format, partition_by, etc).

    • Directory exists, does not contain partitions, but [partition_by][] is set Directory will be deleted, and will be created with partitions.

    • Directory exists and contains partitions, but [partition_by][] is not set Directory will be deleted, and will be created with partitions.

    • Directory exists and contains partitions, but with different partitioning schema than [partition_by][] Data is appended to a directory with new partitioning schema.

      Warning

      Spark cannot read directory with multiple partitioning schemas, unless using recursive=True to disable partition scanning.

    • Directory exists and partitioned according [partition_by][], but partition is present only in dataframe New partition directory is created.

    • Directory exists and partitioned according [partition_by][], partition is present in both dataframe and directory Partition directory will be deleted, and new one is created with files containing data from dataframe.

    • Directory exists and partitioned according [partition_by][], but partition is present only in directory, not dataframe Existing partition is left intact.

  • replace_entire_directory Remove existing directory and create new one, overwriting all existing data. All existing partitions are dropped.

    Same as Spark's df.write.option("partitionOverwriteMode", "static").mode("overwrite").save().

Note

Unlike using pure Spark, config option spark.sql.sources.partitionOverwriteMode does not affect behavior of any mode

partition_by = Field(default=None, alias='partitionBy') class-attribute instance-attribute

List of columns should be used for data partitioning. None means partitioning is disabled.

Each partition is a folder which contains only files with the specific column value, like some.csv/col1=value1, some.csv/col1=value2, and so on.

Multiple partitions columns means nested folder structure, like some.csv/col1=val1/col2=val2.

If WHERE clause in the query contains expression like partition = value, Spark will scan only files in a specific partition.

Examples: reg_id or ["reg_id", "business_dt"]

Note

Values should be scalars (integers, strings), and either static (countryId) or incrementing (dates, years), with low number of distinct values.

Columns like userId or datetime/timestamp should NOT be used for partitioning.

apply_to_writer(writer)

Apply provided format to pyspark.sql.DataFrameWriter. support hooks

Returns

pyspark.sql.DataFrameWriter Writer with options applied.

Source code in onetl/file/file_df_writer/options.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
@slot
def apply_to_writer(self, writer: DataFrameWriter) -> DataFrameWriter:
    """
    Apply provided format to `pyspark.sql.DataFrameWriter`. [![support hooks](https://img.shields.io/badge/%20-support%20hooks-blue)](/hooks/)

    Returns
    -------
    pyspark.sql.DataFrameWriter
        Writer with options applied.
    """
    for method, value in self.dict(by_alias=True, exclude_none=True, exclude={"if_exists"}).items():
        # <value> is the arguments that will be passed to the <method>
        # format orc, parquet methods and format simultaneously
        if hasattr(writer, method):
            if isinstance(value, Iterable) and not isinstance(value, str):
                writer = getattr(writer, method)(*value)
            else:
                writer = getattr(writer, method)(value)
        else:
            writer = writer.option(method, value)

    if self.if_exists == FileDFExistBehavior.REPLACE_OVERLAPPING_PARTITIONS:
        writer = writer.mode("overwrite").option("partitionOverwriteMode", "dynamic")
    elif self.if_exists == FileDFExistBehavior.REPLACE_ENTIRE_DIRECTORY:
        writer = writer.mode("overwrite").option("partitionOverwriteMode", "static")
    elif self.if_exists == FileDFExistBehavior.SKIP_ENTIRE_DIRECTORY:
        writer = writer.mode("ignore")
    elif self.if_exists == FileDFExistBehavior.APPEND:
        writer = writer.mode("append")
    else:
        writer = writer.mode("error")

    return writer