Skip to content

FileDF Reader

Bases: FrozenModel

Allows you to read files from a source path with specified file connection and parameters, and return a Spark DataFrame. |support_hooks|

.. warning::

This class does **not** support read strategies.

.. versionadded:: 0.9.0

Parameters

connection : :obj:BaseFileDFConnection <onetl.base.base_file_df_connection.BaseFileDFConnection> File DataFrame connection. See :ref:file-df-connections section.

:obj:BaseReadableFileFormat <onetl.base.base_file_format.BaseReadableFileFormat>

File format to read.

os.PathLike or str, optional, default: None

Directory path to read data from.

Could be None, but only if you pass file paths directly to :obj:~run method

:obj:pyspark.sql.types.StructType, optional, default: None

Spark DataFrame schema.

:obj:FileDFReaderOptions <onetl.file.file_df_reader.options.FileDFReaderOptions>, optional

Common reading options.

Examples

.. tabs::

.. code-tab:: py Read CSV files from local filesystem

    from onetl.connection import SparkLocalFS
    from onetl.file import FileDFReader
    from onetl.file.format import CSV

    csv = CSV(delimiter=",")
    local_fs = SparkLocalFS(spark=spark)

    reader = FileDFReader(
        connection=local_fs,
        format=csv,
        source_path="/path/to/directory",
    )

.. code-tab:: py All supported options

    from onetl.connection import SparkLocalFS
    from onetl.file import FileDFReader
    from onetl.file.format import CSV

    csv = CSV(delimiter=",")
    local_fs = SparkLocalFS(spark=spark)

    reader = FileDFReader(
        connection=local_fs,
        format=csv,
        source_path="/path/to/directory",
        options=FileDFReader.Options(recursive=False),
    )
Source code in onetl/file/file_df_reader/file_df_reader.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
@support_hooks
class FileDFReader(FrozenModel):
    """Allows you to read files from a source path with specified file connection
    and parameters, and return a Spark DataFrame. |support_hooks|

    .. warning::

        This class does **not** support read strategies.

    .. versionadded:: 0.9.0

    Parameters
    ----------
    connection : :obj:`BaseFileDFConnection <onetl.base.base_file_df_connection.BaseFileDFConnection>`
        File DataFrame connection. See :ref:`file-df-connections` section.

    format : :obj:`BaseReadableFileFormat <onetl.base.base_file_format.BaseReadableFileFormat>`
        File format to read.

    source_path : os.PathLike or str, optional, default: ``None``
        Directory path to read data from.

        Could be ``None``, but only if you pass file paths directly to
        :obj:`~run` method

    df_schema : :obj:`pyspark.sql.types.StructType`, optional, default: ``None``
        Spark DataFrame schema.

    options : :obj:`FileDFReaderOptions <onetl.file.file_df_reader.options.FileDFReaderOptions>`, optional
        Common reading options.

    Examples
    --------

    .. tabs::

        .. code-tab:: py Read CSV files from local filesystem

            from onetl.connection import SparkLocalFS
            from onetl.file import FileDFReader
            from onetl.file.format import CSV

            csv = CSV(delimiter=",")
            local_fs = SparkLocalFS(spark=spark)

            reader = FileDFReader(
                connection=local_fs,
                format=csv,
                source_path="/path/to/directory",
            )

        .. code-tab:: py All supported options

            from onetl.connection import SparkLocalFS
            from onetl.file import FileDFReader
            from onetl.file.format import CSV

            csv = CSV(delimiter=",")
            local_fs = SparkLocalFS(spark=spark)

            reader = FileDFReader(
                connection=local_fs,
                format=csv,
                source_path="/path/to/directory",
                options=FileDFReader.Options(recursive=False),
            )
    """

    Options = FileDFReaderOptions

    connection: BaseFileDFConnection
    format: BaseReadableFileFormat
    source_path: Optional[PurePathProtocol] = None
    df_schema: Optional[StructType] = None
    options: FileDFReaderOptions = FileDFReaderOptions()

    _connection_checked: bool = PrivateAttr(default=False)

    @slot
    def run(self, files: Iterable[str | os.PathLike] | None = None) -> DataFrame:
        """
        Method for reading files as DataFrame. |support_hooks|

        .. versionadded:: 0.9.0

        Parameters
        ----------

        files : Iterator[str | os.PathLike] | None, default ``None``
            File list to read.

            If empty, read files from ``source_path``.

        Returns
        -------
        df : :obj:`pyspark.sql.DataFrame`

            Spark DataFrame

        Examples
        --------

        Read CSV files from directory ``/path``:

        .. code:: python

            from onetl.connection import SparkLocalFS
            from onetl.file import FileDFReader
            from onetl.file.format import CSV

            csv = CSV(delimiter=",")
            local_fs = SparkLocalFS(spark=spark)

            reader = FileDFReader(
                connection=local_fs,
                format=csv,
                source_path="/path",
            )
            df = reader.run()

        Read some CSV files using file paths:

        .. code:: python

            from onetl.connection import SparkLocalFS
            from onetl.file import FileDFReader
            from onetl.file.format import CSV

            csv = CSV(delimiter=",")
            local_fs = SparkLocalFS(spark=spark)

            reader = FileDFReader(
                connection=local_fs,
                format=csv,
            )

            df = reader.run(
                [
                    "/path/file1.csv",
                    "/path/nested/file2.csv",
                ]
            )

        Read only specific CSV files in directory:

        .. code:: python

            from onetl.connection import SparkLocalFS
            from onetl.file import FileDFReader
            from onetl.file.format import CSV

            csv = CSV(delimiter=",")
            local_fs = SparkLocalFS(spark=spark)

            reader = FileDFReader(
                connection=local_fs,
                format=csv,
                source_path="/path",
            )

            df = reader.run(
                [
                    # file paths could be relative
                    "/path/file1.csv",
                    "/path/nested/file2.csv",
                ]
            )
        """

        entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() starts")

        if files is None and not self.source_path:
            raise ValueError("Neither file list nor `source_path` are passed")

        if not self._connection_checked:
            self._log_parameters(files)
            self.connection.check()
            self._connection_checked = True

        if files:
            job_description = f"{self.connection} -> {self.__class__.__name__}.run([..files..])"
        else:
            job_description = f"{self.connection} -> {self.__class__.__name__}.run({self.source_path})"

        with override_job_description(self.connection.spark, job_description):
            paths: FileSet[PurePathProtocol] = FileSet()
            if files is not None:
                paths = FileSet(self._validate_files(files))
            elif self.source_path:
                paths = FileSet([self.source_path])

            df = self._read_files(paths)

        entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() ends", char="-")
        return df

    def _read_files(self, paths: FileSet[PurePathProtocol]) -> DataFrame:
        log.info("|%s| Paths to be read:", self.__class__.__name__)
        log_lines(log, str(paths))
        log_with_indent(log, "")

        return self.connection.read_files_as_df(
            root=self.source_path,
            paths=list(paths),
            format=self.format,
            df_schema=self.df_schema,
            options=self.options,
        )

    def _log_parameters(self, files: Iterable[str | os.PathLike] | None = None) -> None:
        log.info("|%s| -> |Spark| Reading files using parameters:", self.connection.__class__.__name__)
        log_with_indent(log, "source_path = %s", f"'{self.source_path}'" if self.source_path else "None")
        log_with_indent(log, "format = %r", self.format)

        if self.df_schema:
            empty_df = self.connection.spark.createDataFrame([], self.df_schema)  # type: ignore[attr-defined]
            log_dataframe_schema(log, empty_df)

        options_dict = self.options.dict(exclude_none=True)
        log_options(log, options_dict)

        if files is not None and self.source_path:
            log.warning(
                "|%s| Passed both `source_path` and files list at the same time. Using explicit files list",
                self.__class__.__name__,
            )

    @validator("source_path", pre=True)
    def _validate_source_path(cls, source_path, values):
        if source_path is None:
            return None

        connection = values.get("connection")
        if isinstance(connection, BaseFileDFConnection):
            return connection.path_from_string(source_path)
        return source_path

    @validator("format")
    def _validate_format(cls, format, values):  # noqa: WPS125
        connection = values.get("connection")
        if isinstance(connection, BaseFileDFConnection):
            connection.check_if_format_supported(format)
        return format

    @validator("options")
    def _validate_options(cls, value):
        return cls.Options.parse(value)

    def _validate_files(  # noqa: WPS231
        self,
        files: Iterable[os.PathLike | str],
    ) -> OrderedSet[PurePathProtocol]:
        result: OrderedSet[PurePathProtocol] = OrderedSet()

        for file in files:
            file_path = file if isinstance(file, PurePathProtocol) else self.connection.path_from_string(file)

            if not self.source_path:
                if not file_path.is_absolute():
                    raise ValueError("Cannot pass relative file path with empty `source_path`")
            elif file_path.is_absolute() and self.source_path not in file_path.parents:
                raise ValueError(f"File path '{file_path}' does not match source_path '{self.source_path}'")
            elif not file_path.is_absolute():
                # Make file path absolute
                file_path = self.source_path / file

            result.add(file_path)

        return result

    @classmethod
    def _forward_refs(cls) -> dict[str, type]:
        try_import_pyspark()
        from pyspark.sql.types import StructType  # noqa: WPS442

        # avoid importing pyspark unless user called the constructor,
        # as we allow user to use `Connection.get_packages()` for creating Spark session
        refs = super()._forward_refs()
        refs["StructType"] = StructType
        return refs

run(files=None)

Method for reading files as DataFrame. |support_hooks|

.. versionadded:: 0.9.0

Parameters

Iterator[str | os.PathLike] | None, default None

File list to read.

If empty, read files from source_path.

Returns

df : :obj:pyspark.sql.DataFrame

Spark DataFrame

Examples

Read CSV files from directory /path:

.. code:: python

from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import CSV

csv = CSV(delimiter=",")
local_fs = SparkLocalFS(spark=spark)

reader = FileDFReader(
    connection=local_fs,
    format=csv,
    source_path="/path",
)
df = reader.run()

Read some CSV files using file paths:

.. code:: python

from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import CSV

csv = CSV(delimiter=",")
local_fs = SparkLocalFS(spark=spark)

reader = FileDFReader(
    connection=local_fs,
    format=csv,
)

df = reader.run(
    [
        "/path/file1.csv",
        "/path/nested/file2.csv",
    ]
)

Read only specific CSV files in directory:

.. code:: python

from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import CSV

csv = CSV(delimiter=",")
local_fs = SparkLocalFS(spark=spark)

reader = FileDFReader(
    connection=local_fs,
    format=csv,
    source_path="/path",
)

df = reader.run(
    [
        # file paths could be relative
        "/path/file1.csv",
        "/path/nested/file2.csv",
    ]
)
Source code in onetl/file/file_df_reader/file_df_reader.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@slot
def run(self, files: Iterable[str | os.PathLike] | None = None) -> DataFrame:
    """
    Method for reading files as DataFrame. |support_hooks|

    .. versionadded:: 0.9.0

    Parameters
    ----------

    files : Iterator[str | os.PathLike] | None, default ``None``
        File list to read.

        If empty, read files from ``source_path``.

    Returns
    -------
    df : :obj:`pyspark.sql.DataFrame`

        Spark DataFrame

    Examples
    --------

    Read CSV files from directory ``/path``:

    .. code:: python

        from onetl.connection import SparkLocalFS
        from onetl.file import FileDFReader
        from onetl.file.format import CSV

        csv = CSV(delimiter=",")
        local_fs = SparkLocalFS(spark=spark)

        reader = FileDFReader(
            connection=local_fs,
            format=csv,
            source_path="/path",
        )
        df = reader.run()

    Read some CSV files using file paths:

    .. code:: python

        from onetl.connection import SparkLocalFS
        from onetl.file import FileDFReader
        from onetl.file.format import CSV

        csv = CSV(delimiter=",")
        local_fs = SparkLocalFS(spark=spark)

        reader = FileDFReader(
            connection=local_fs,
            format=csv,
        )

        df = reader.run(
            [
                "/path/file1.csv",
                "/path/nested/file2.csv",
            ]
        )

    Read only specific CSV files in directory:

    .. code:: python

        from onetl.connection import SparkLocalFS
        from onetl.file import FileDFReader
        from onetl.file.format import CSV

        csv = CSV(delimiter=",")
        local_fs = SparkLocalFS(spark=spark)

        reader = FileDFReader(
            connection=local_fs,
            format=csv,
            source_path="/path",
        )

        df = reader.run(
            [
                # file paths could be relative
                "/path/file1.csv",
                "/path/nested/file2.csv",
            ]
        )
    """

    entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() starts")

    if files is None and not self.source_path:
        raise ValueError("Neither file list nor `source_path` are passed")

    if not self._connection_checked:
        self._log_parameters(files)
        self.connection.check()
        self._connection_checked = True

    if files:
        job_description = f"{self.connection} -> {self.__class__.__name__}.run([..files..])"
    else:
        job_description = f"{self.connection} -> {self.__class__.__name__}.run({self.source_path})"

    with override_job_description(self.connection.spark, job_description):
        paths: FileSet[PurePathProtocol] = FileSet()
        if files is not None:
            paths = FileSet(self._validate_files(files))
        elif self.source_path:
            paths = FileSet([self.source_path])

        df = self._read_files(paths)

    entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() ends", char="-")
    return df