Bases: SparkFileDFConnection
Spark connection to local filesystem. |support_hooks|
Based on Spark Generic File Data Source <https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html>_.
.. warning::
To use SparkHDFS connector you should have PySpark installed (or injected to ``sys.path``)
BEFORE creating the connector instance.
See :ref:`install-spark` installation instruction for more details.
.. warning::
Currently supports only Spark sessions created with option ``spark.master: local``.
.. note::
Supports only reading files as Spark DataFrame and writing DataFrame to files.
Does NOT support file operations, like create, delete, rename, etc.
.. versionadded:: 0.9.0
Parameters
spark : :class:pyspark.sql.SparkSession
Spark session
Examples
.. code:: python
from onetl.connection import SparkLocalFS
from pyspark.sql import SparkSession
# create Spark session
spark = SparkSession.builder.master("local").appName("spark-app-name").getOrCreate()
# create connection
local_fs = SparkLocalFS(spark=spark).check()
Source code in onetl/connection/file_df_connection/spark_local_fs.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 | @support_hooks
class SparkLocalFS(SparkFileDFConnection):
"""
Spark connection to local filesystem. |support_hooks|
Based on `Spark Generic File Data Source <https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html>`_.
.. warning::
To use SparkHDFS connector you should have PySpark installed (or injected to ``sys.path``)
BEFORE creating the connector instance.
See :ref:`install-spark` installation instruction for more details.
.. warning::
Currently supports only Spark sessions created with option ``spark.master: local``.
.. note::
Supports only reading files as Spark DataFrame and writing DataFrame to files.
Does NOT support file operations, like create, delete, rename, etc.
.. versionadded:: 0.9.0
Parameters
----------
spark : :class:`pyspark.sql.SparkSession`
Spark session
Examples
--------
.. code:: python
from onetl.connection import SparkLocalFS
from pyspark.sql import SparkSession
# create Spark session
spark = SparkSession.builder.master("local").appName("spark-app-name").getOrCreate()
# create connection
local_fs = SparkLocalFS(spark=spark).check()
"""
@slot
def path_from_string(self, path: os.PathLike | str) -> Path:
return LocalPath(os.fspath(path))
@property
def instance_url(self):
fqdn = socket.getfqdn()
return f"file://{fqdn}"
def __str__(self):
# str should not make network requests
return "LocalFS"
@validator("spark")
def _validate_spark(cls, spark):
master = spark.conf.get("spark.master")
if not master.startswith("local"):
raise ValueError(f"Currently supports only spark.master='local', got {master!r}")
return spark
def _convert_to_url(self, path: PurePathProtocol) -> str:
# "file:///absolute/path" on Unix
# "file:///c:/absolute/path" on Windows
# relative paths cannot be passed using file:// syntax
return "file:///" + path.as_posix().lstrip("/")
def _get_default_path(self):
return LocalPath(os.getcwd())
|