Bases: SparkFileDFConnection
Spark connection to local filesystem. 
Based on Spark Generic File Data Source.
Warning
To use SparkHDFS connector you should have PySpark installed (or injected to sys.path)
BEFORE creating the connector instance.
See [install-spark][] installation instruction for more details.
Warning
Currently supports only Spark sessions created with option spark.master: local.
Note
Supports only reading files as Spark DataFrame and writing DataFrame to files.
Does NOT support file operations, like create, delete, rename, etc.
Parameters
spark : pyspark.sql.SparkSession
Spark session
Examples
from onetl.connection import SparkLocalFS
from pyspark.sql import SparkSession
# create Spark session
spark = SparkSession.builder.master("local").appName("spark-app-name").getOrCreate()
# create connection
local_fs = SparkLocalFS(spark=spark).check()
Source code in onetl/connection/file_df_connection/spark_local_fs.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 | @support_hooks
class SparkLocalFS(SparkFileDFConnection):
"""
Spark connection to local filesystem. [](/hooks/)
Based on [Spark Generic File Data Source](https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html).
!!! warning
To use SparkHDFS connector you should have PySpark installed (or injected to `sys.path`)
BEFORE creating the connector instance.
See [install-spark][] installation instruction for more details.
!!! warning
Currently supports only Spark sessions created with option `spark.master: local`.
!!! note
Supports only reading files as Spark DataFrame and writing DataFrame to files.
Does NOT support file operations, like create, delete, rename, etc.
!!! success "Added in 0.9.0"
Parameters
----------
spark : `pyspark.sql.SparkSession`
Spark session
Examples
--------
```python
from onetl.connection import SparkLocalFS
from pyspark.sql import SparkSession
# create Spark session
spark = SparkSession.builder.master("local").appName("spark-app-name").getOrCreate()
# create connection
local_fs = SparkLocalFS(spark=spark).check()
```
"""
@slot
def path_from_string(self, path: os.PathLike | str) -> Path:
return LocalPath(os.fspath(path))
@property
def instance_url(self):
fqdn = socket.getfqdn()
return f"file://{fqdn}"
def __str__(self):
# str should not make network requests
return "LocalFS"
@validator("spark")
def _validate_spark(cls, spark):
master = spark.conf.get("spark.master")
if not master.startswith("local"):
msg = f"Currently supports only spark.master='local', got {master!r}"
raise ValueError(msg)
return spark
def _convert_to_url(self, path: PurePathProtocol) -> str:
# "file:///absolute/path" on Unix
# "file:///c:/absolute/path" on Windows
# relative paths cannot be passed using file:// syntax
return "file:///" + path.as_posix().lstrip("/")
def _get_default_path(self):
return LocalPath.cwd()
|