Skip to content

Spark на локальной файловой системе

Bases: SparkFileDFConnection

Spark connection to local filesystem. |support_hooks|

Based on Spark Generic File Data Source <https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html>_.

.. warning::

To use SparkHDFS connector you should have PySpark installed (or injected to ``sys.path``)
BEFORE creating the connector instance.

See :ref:`install-spark` installation instruction for more details.

.. warning::

Currently supports only Spark sessions created with option ``spark.master: local``.

.. note::

Supports only reading files as Spark DataFrame and writing DataFrame to files.

Does NOT support file operations, like create, delete, rename, etc.

.. versionadded:: 0.9.0

Parameters

spark : :class:pyspark.sql.SparkSession Spark session

Examples

.. code:: python

from onetl.connection import SparkLocalFS
from pyspark.sql import SparkSession

# create Spark session
spark = SparkSession.builder.master("local").appName("spark-app-name").getOrCreate()

# create connection
local_fs = SparkLocalFS(spark=spark).check()
Source code in onetl/connection/file_df_connection/spark_local_fs.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@support_hooks
class SparkLocalFS(SparkFileDFConnection):
    """
    Spark connection to local filesystem. |support_hooks|

    Based on `Spark Generic File Data Source <https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html>`_.

    .. warning::

        To use SparkHDFS connector you should have PySpark installed (or injected to ``sys.path``)
        BEFORE creating the connector instance.

        See :ref:`install-spark` installation instruction for more details.

    .. warning::

        Currently supports only Spark sessions created with option ``spark.master: local``.

    .. note::

        Supports only reading files as Spark DataFrame and writing DataFrame to files.

        Does NOT support file operations, like create, delete, rename, etc.

    .. versionadded:: 0.9.0

    Parameters
    ----------
    spark : :class:`pyspark.sql.SparkSession`
        Spark session

    Examples
    --------

    .. code:: python

        from onetl.connection import SparkLocalFS
        from pyspark.sql import SparkSession

        # create Spark session
        spark = SparkSession.builder.master("local").appName("spark-app-name").getOrCreate()

        # create connection
        local_fs = SparkLocalFS(spark=spark).check()
    """

    @slot
    def path_from_string(self, path: os.PathLike | str) -> Path:
        return LocalPath(os.fspath(path))

    @property
    def instance_url(self):
        fqdn = socket.getfqdn()
        return f"file://{fqdn}"

    def __str__(self):
        # str should not make network requests
        return "LocalFS"

    @validator("spark")
    def _validate_spark(cls, spark):
        master = spark.conf.get("spark.master")
        if not master.startswith("local"):
            raise ValueError(f"Currently supports only spark.master='local', got {master!r}")
        return spark

    def _convert_to_url(self, path: PurePathProtocol) -> str:
        # "file:///absolute/path" on Unix
        # "file:///c:/absolute/path" on Windows
        # relative paths cannot be passed using file:// syntax
        return "file:///" + path.as_posix().lstrip("/")

    def _get_default_path(self):
        return LocalPath(os.getcwd())