Skip to content

Spark на локальной файловой системе

Bases: SparkFileDFConnection

Spark connection to local filesystem. support hooks

Based on Spark Generic File Data Source.

Warning

To use SparkHDFS connector you should have PySpark installed (or injected to sys.path) BEFORE creating the connector instance.

See [install-spark][] installation instruction for more details.

Warning

Currently supports only Spark sessions created with option spark.master: local.

Note

Supports only reading files as Spark DataFrame and writing DataFrame to files.

Does NOT support file operations, like create, delete, rename, etc.

Added in 0.9.0

Parameters

spark : pyspark.sql.SparkSession Spark session

Examples

from onetl.connection import SparkLocalFS
from pyspark.sql import SparkSession

# create Spark session
spark = SparkSession.builder.master("local").appName("spark-app-name").getOrCreate()

# create connection
local_fs = SparkLocalFS(spark=spark).check()
Source code in onetl/connection/file_df_connection/spark_local_fs.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@support_hooks
class SparkLocalFS(SparkFileDFConnection):
    """
    Spark connection to local filesystem. [![support hooks](https://img.shields.io/badge/%20-support%20hooks-blue)](/hooks/)

    Based on [Spark Generic File Data Source](https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html).

    !!! warning

        To use SparkHDFS connector you should have PySpark installed (or injected to `sys.path`)
        BEFORE creating the connector instance.

        See [install-spark][] installation instruction for more details.

    !!! warning

        Currently supports only Spark sessions created with option `spark.master: local`.

    !!! note

        Supports only reading files as Spark DataFrame and writing DataFrame to files.

        Does NOT support file operations, like create, delete, rename, etc.

    !!! success "Added in 0.9.0"

    Parameters
    ----------
    spark : `pyspark.sql.SparkSession`
        Spark session

    Examples
    --------

    ```python
    from onetl.connection import SparkLocalFS
    from pyspark.sql import SparkSession

    # create Spark session
    spark = SparkSession.builder.master("local").appName("spark-app-name").getOrCreate()

    # create connection
    local_fs = SparkLocalFS(spark=spark).check()
    ```
    """

    @slot
    def path_from_string(self, path: os.PathLike | str) -> Path:
        return LocalPath(os.fspath(path))

    @property
    def instance_url(self):
        fqdn = socket.getfqdn()
        return f"file://{fqdn}"

    def __str__(self):
        # str should not make network requests
        return "LocalFS"

    @validator("spark")
    def _validate_spark(cls, spark):
        master = spark.conf.get("spark.master")
        if not master.startswith("local"):
            msg = f"Currently supports only spark.master='local', got {master!r}"
            raise ValueError(msg)
        return spark

    def _convert_to_url(self, path: PurePathProtocol) -> str:
        # "file:///absolute/path" on Unix
        # "file:///c:/absolute/path" on Windows
        # relative paths cannot be passed using file:// syntax
        return "file:///" + path.as_posix().lstrip("/")

    def _get_default_path(self):
        return LocalPath.cwd()