Skip to content

FileDF Reader

Bases: FrozenModel

Allows you to read files from a source path with specified file connection and parameters, and return a Spark DataFrame. support hooks

Warning

This class does not support read strategies.

Added in 0.9.0

Parameters

connection : BaseFileDFConnection File DataFrame connection. See [file-df-connections][] section.

BaseReadableFileFormat

File format to read.

os.PathLike or str, optional, default: None

Directory path to read data from.

Could be None, but only if you pass file paths directly to run method

pyspark.sql.types.StructType, optional, default: None

Spark DataFrame schema.

FileDFReaderOptions, optional

Common reading options.

Examples

from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import CSV

csv = CSV(delimiter=",")
local_fs = SparkLocalFS(spark=spark)

reader = FileDFReader(
    connection=local_fs,
    format=csv,
    source_path="/path/to/directory",
)
from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import CSV

csv = CSV(delimiter=",")
local_fs = SparkLocalFS(spark=spark)

reader = FileDFReader(
    connection=local_fs,
    format=csv,
    source_path="/path/to/directory",
    options=FileDFReader.Options(recursive=False),
)
Source code in onetl/file/file_df_reader/file_df_reader.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
@support_hooks
class FileDFReader(FrozenModel):
    """Allows you to read files from a source path with specified file connection
    and parameters, and return a Spark DataFrame. [![support hooks](https://img.shields.io/badge/%20-support%20hooks-blue)](/hooks/)

    !!! warning

        This class does **not** support read strategies.

    !!! success "Added in 0.9.0"

    Parameters
    ----------
    connection : [BaseFileDFConnection][onetl.base.base_file_df_connection.BaseFileDFConnection]
        File DataFrame connection. See [file-df-connections][] section.

    format : [BaseReadableFileFormat][onetl.base.base_file_format.BaseReadableFileFormat]
        File format to read.

    source_path : os.PathLike or str, optional, default: `None`
        Directory path to read data from.

        Could be `None`, but only if you pass file paths directly to
        [run][] method

    df_schema : `pyspark.sql.types.StructType`, optional, default: `None`
        Spark DataFrame schema.

    options : [FileDFReaderOptions][onetl.file.file_df_reader.options.FileDFReaderOptions], optional
        Common reading options.

    Examples
    --------

    === "Read CSV files from local filesystem"
        ```python
        from onetl.connection import SparkLocalFS
        from onetl.file import FileDFReader
        from onetl.file.format import CSV

        csv = CSV(delimiter=",")
        local_fs = SparkLocalFS(spark=spark)

        reader = FileDFReader(
            connection=local_fs,
            format=csv,
            source_path="/path/to/directory",
        )
        ```
    === "All supported options"
        ```python
        from onetl.connection import SparkLocalFS
        from onetl.file import FileDFReader
        from onetl.file.format import CSV

        csv = CSV(delimiter=",")
        local_fs = SparkLocalFS(spark=spark)

        reader = FileDFReader(
            connection=local_fs,
            format=csv,
            source_path="/path/to/directory",
            options=FileDFReader.Options(recursive=False),
        )
        ```
    """

    Options = FileDFReaderOptions

    connection: BaseFileDFConnection
    format: BaseReadableFileFormat
    source_path: Optional[PurePathProtocol] = None
    df_schema: Optional[StructType] = None
    options: FileDFReaderOptions = FileDFReaderOptions()

    _connection_checked: bool = PrivateAttr(default=False)

    @slot
    def run(self, files: Iterable[str | os.PathLike] | None = None) -> DataFrame:
        """
        Method for reading files as DataFrame. [![support hooks](https://img.shields.io/badge/%20-support%20hooks-blue)](/hooks/)

        !!! success "Added in 0.9.0"

        Parameters
        ----------

        files : Iterator[str | os.PathLike] | None, default `None`
            File list to read.

            If empty, read files from `source_path`.

        Returns
        -------
        df : `pyspark.sql.DataFrame`

            Spark DataFrame

        Examples
        --------

        Read CSV files from directory `/path`:

        ```python
        from onetl.connection import SparkLocalFS
        from onetl.file import FileDFReader
        from onetl.file.format import CSV

        csv = CSV(delimiter=",")
        local_fs = SparkLocalFS(spark=spark)

        reader = FileDFReader(
            connection=local_fs,
            format=csv,
            source_path="/path",
        )
        df = reader.run()
        ```
        Read some CSV files using file paths:

        ```python
        from onetl.connection import SparkLocalFS
        from onetl.file import FileDFReader
        from onetl.file.format import CSV

        csv = CSV(delimiter=",")
        local_fs = SparkLocalFS(spark=spark)

        reader = FileDFReader(
            connection=local_fs,
            format=csv,
        )

        df = reader.run(
            [
                "/path/file1.csv",
                "/path/nested/file2.csv",
            ]
        )
        ```
        Read only specific CSV files in directory:

        ```python
        from onetl.connection import SparkLocalFS
        from onetl.file import FileDFReader
        from onetl.file.format import CSV

        csv = CSV(delimiter=",")
        local_fs = SparkLocalFS(spark=spark)

        reader = FileDFReader(
            connection=local_fs,
            format=csv,
            source_path="/path",
        )

        df = reader.run(
            [
                # file paths could be relative
                "/path/file1.csv",
                "/path/nested/file2.csv",
            ]
        )
        ```
        """

        entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() starts")

        if files is None and not self.source_path:
            msg = "Neither file list nor `source_path` are passed"
            raise ValueError(msg)

        if not self._connection_checked:
            self._log_parameters(files)
            self.connection.check()
            self._connection_checked = True

        if files:
            job_description = f"{self.connection} -> {self.__class__.__name__}.run([..files..])"
        else:
            job_description = f"{self.connection} -> {self.__class__.__name__}.run({self.source_path})"

        with override_job_description(self.connection.spark, job_description):
            paths: FileSet[PurePathProtocol] = FileSet()
            if files is not None:
                paths = FileSet(self._validate_files(files))
            elif self.source_path:
                paths = FileSet([self.source_path])

            df = self._read_files(paths)

        entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() ends", char="-")
        return df

    def _read_files(self, paths: FileSet[PurePathProtocol]) -> DataFrame:
        log.info("|%s| Paths to be read:", self.__class__.__name__)
        log_lines(log, str(paths))
        log_with_indent(log, "")

        return self.connection.read_files_as_df(
            root=self.source_path,
            paths=list(paths),
            format=self.format,
            df_schema=self.df_schema,
            options=self.options,
        )

    def _log_parameters(self, files: Iterable[str | os.PathLike] | None = None) -> None:
        log.info("|%s| -> |Spark| Reading files using parameters:", self.connection.__class__.__name__)
        log_with_indent(log, "source_path = %s", f"'{self.source_path}'" if self.source_path else "None")
        log_with_indent(log, "format = %r", self.format)

        if self.df_schema:
            empty_df = self.connection.spark.createDataFrame([], self.df_schema)  # type: ignore[attr-defined]
            log_dataframe_schema(log, empty_df)

        options_dict = self.options.dict(exclude_none=True)
        log_options(log, options_dict)

        if files is not None and self.source_path:
            log.warning(
                "|%s| Passed both `source_path` and files list at the same time. Using explicit files list",
                self.__class__.__name__,
            )

    @validator("source_path", pre=True)
    def _validate_source_path(cls, source_path, values):
        if source_path is None:
            return None

        connection = values.get("connection")
        if isinstance(connection, BaseFileDFConnection):
            return connection.path_from_string(source_path)
        return source_path

    @validator("format")
    def _validate_format(cls, format, values):
        connection = values.get("connection")
        if isinstance(connection, BaseFileDFConnection):
            connection.check_if_format_supported(format)
        return format

    @validator("options")
    def _validate_options(cls, value):
        return cls.Options.parse(value)

    def _validate_files(
        self,
        files: Iterable[os.PathLike | str],
    ) -> OrderedSet[PurePathProtocol]:
        result: OrderedSet[PurePathProtocol] = OrderedSet()

        for file in files:
            file_path = file if isinstance(file, PurePathProtocol) else self.connection.path_from_string(file)

            if not self.source_path:
                if not file_path.is_absolute():
                    msg = "Cannot pass relative file path with empty `source_path`"
                    raise ValueError(msg)
            elif file_path.is_absolute() and self.source_path not in file_path.parents:
                msg = f"File path '{file_path}' does not match source_path '{self.source_path}'"
                raise ValueError(msg)
            elif not file_path.is_absolute():
                # Make file path absolute
                file_path = self.source_path / file

            result.add(file_path)

        return result

    @classmethod
    def _forward_refs(cls) -> dict[str, type]:
        try_import_pyspark()
        from pyspark.sql.types import StructType

        # avoid importing pyspark unless user called the constructor,
        # as we allow user to use `Connection.get_packages()` for creating Spark session
        refs = super()._forward_refs()
        refs["StructType"] = StructType
        return refs

run(files=None)

Method for reading files as DataFrame. support hooks

Added in 0.9.0

Parameters

Iterator[str | os.PathLike] | None, default None

File list to read.

If empty, read files from source_path.

Returns

df : pyspark.sql.DataFrame

Spark DataFrame

Examples

Read CSV files from directory /path:

from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import CSV

csv = CSV(delimiter=",")
local_fs = SparkLocalFS(spark=spark)

reader = FileDFReader(
    connection=local_fs,
    format=csv,
    source_path="/path",
)
df = reader.run()
Read some CSV files using file paths:

from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import CSV

csv = CSV(delimiter=",")
local_fs = SparkLocalFS(spark=spark)

reader = FileDFReader(
    connection=local_fs,
    format=csv,
)

df = reader.run(
    [
        "/path/file1.csv",
        "/path/nested/file2.csv",
    ]
)
Read only specific CSV files in directory:

from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import CSV

csv = CSV(delimiter=",")
local_fs = SparkLocalFS(spark=spark)

reader = FileDFReader(
    connection=local_fs,
    format=csv,
    source_path="/path",
)

df = reader.run(
    [
        # file paths could be relative
        "/path/file1.csv",
        "/path/nested/file2.csv",
    ]
)
Source code in onetl/file/file_df_reader/file_df_reader.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
@slot
def run(self, files: Iterable[str | os.PathLike] | None = None) -> DataFrame:
    """
    Method for reading files as DataFrame. [![support hooks](https://img.shields.io/badge/%20-support%20hooks-blue)](/hooks/)

    !!! success "Added in 0.9.0"

    Parameters
    ----------

    files : Iterator[str | os.PathLike] | None, default `None`
        File list to read.

        If empty, read files from `source_path`.

    Returns
    -------
    df : `pyspark.sql.DataFrame`

        Spark DataFrame

    Examples
    --------

    Read CSV files from directory `/path`:

    ```python
    from onetl.connection import SparkLocalFS
    from onetl.file import FileDFReader
    from onetl.file.format import CSV

    csv = CSV(delimiter=",")
    local_fs = SparkLocalFS(spark=spark)

    reader = FileDFReader(
        connection=local_fs,
        format=csv,
        source_path="/path",
    )
    df = reader.run()
    ```
    Read some CSV files using file paths:

    ```python
    from onetl.connection import SparkLocalFS
    from onetl.file import FileDFReader
    from onetl.file.format import CSV

    csv = CSV(delimiter=",")
    local_fs = SparkLocalFS(spark=spark)

    reader = FileDFReader(
        connection=local_fs,
        format=csv,
    )

    df = reader.run(
        [
            "/path/file1.csv",
            "/path/nested/file2.csv",
        ]
    )
    ```
    Read only specific CSV files in directory:

    ```python
    from onetl.connection import SparkLocalFS
    from onetl.file import FileDFReader
    from onetl.file.format import CSV

    csv = CSV(delimiter=",")
    local_fs = SparkLocalFS(spark=spark)

    reader = FileDFReader(
        connection=local_fs,
        format=csv,
        source_path="/path",
    )

    df = reader.run(
        [
            # file paths could be relative
            "/path/file1.csv",
            "/path/nested/file2.csv",
        ]
    )
    ```
    """

    entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() starts")

    if files is None and not self.source_path:
        msg = "Neither file list nor `source_path` are passed"
        raise ValueError(msg)

    if not self._connection_checked:
        self._log_parameters(files)
        self.connection.check()
        self._connection_checked = True

    if files:
        job_description = f"{self.connection} -> {self.__class__.__name__}.run([..files..])"
    else:
        job_description = f"{self.connection} -> {self.__class__.__name__}.run({self.source_path})"

    with override_job_description(self.connection.spark, job_description):
        paths: FileSet[PurePathProtocol] = FileSet()
        if files is not None:
            paths = FileSet(self._validate_files(files))
        elif self.source_path:
            paths = FileSet([self.source_path])

        df = self._read_files(paths)

    entity_boundary_log(log, msg=f"{self.__class__.__name__}.run() ends", char="-")
    return df