Bases: IcebergWarehouse, FrozenModel
Iceberg Filesystem Warehouse.
Note
This warehouse uses FileDFConnection classes to access data at the warehouse location.
It relies on Spark's filesystem configuration and behavior.
Parameters
connection : SparkFileDFConnection
File connection for data storage
str
Warehouse path
Examples
from onetl.connection import Iceberg, SparkLocalFS
local_fs_connection = SparkLocalFS(spark=spark)
warehouse = Iceberg.FilesystemWarehouse(
connection=local_fs_connection,
path="/warehouse/path",
)
from onetl.connection import Iceberg, SparkHDFS
hdfs_connection = SparkHDFS(
host="namenode",
cluster="my-cluster",
spark=spark,
)
warehouse = Iceberg.FilesystemWarehouse(
connection=hdfs_connection,
path="/warehouse/path",
)
from onetl.connection import Iceberg, SparkS3
s3_connection = SparkS3(
host="s3.domain.com",
protocol="http",
bucket="my-bucket",
access_key="access_key",
secret_key="secret_key",
path_style_access=True,
region="us-east-1",
spark=spark,
)
warehouse = Iceberg.FilesystemWarehouse(
connection=s3_connection,
path="/warehouse/path"
)
Source code in onetl/connection/db_connection/iceberg/warehouse/filesystem.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 | @support_hooks
class IcebergFilesystemWarehouse(IcebergWarehouse, FrozenModel):
"""Iceberg Filesystem Warehouse.
!!! success "Added in 0.15.0"
!!! note
This warehouse uses **FileDFConnection** classes to access data at the warehouse location.
It relies on **Spark's filesystem configuration and behavior**.
Parameters
----------
connection : SparkFileDFConnection
File connection for data storage
path : str
Warehouse path
Examples
--------
=== "Local filesystem"
```python
from onetl.connection import Iceberg, SparkLocalFS
local_fs_connection = SparkLocalFS(spark=spark)
warehouse = Iceberg.FilesystemWarehouse(
connection=local_fs_connection,
path="/warehouse/path",
)
```
=== "HDFS"
```python
from onetl.connection import Iceberg, SparkHDFS
hdfs_connection = SparkHDFS(
host="namenode",
cluster="my-cluster",
spark=spark,
)
warehouse = Iceberg.FilesystemWarehouse(
connection=hdfs_connection,
path="/warehouse/path",
)
```
=== "S3"
```python
from onetl.connection import Iceberg, SparkS3
s3_connection = SparkS3(
host="s3.domain.com",
protocol="http",
bucket="my-bucket",
access_key="access_key",
secret_key="secret_key",
path_style_access=True,
region="us-east-1",
spark=spark,
)
warehouse = Iceberg.FilesystemWarehouse(
connection=s3_connection,
path="/warehouse/path"
)
```
"""
connection: SparkFileDFConnection
path: PurePathProtocol
@slot
def get_config(self) -> dict[str, str]:
config = {
"warehouse": self.connection._convert_to_url(self.path), # noqa: SLF001
"io-impl": "org.apache.iceberg.hadoop.HadoopFileIO",
}
if isinstance(self.connection, SparkS3):
prefix = self.connection._get_hadoop_config_prefix() # noqa: SLF001
hadoop_config = {
"hadoop." + k: v
for k, v in self.connection._get_expected_hadoop_config(prefix).items() # noqa: SLF001
}
config.update(hadoop_config)
return config
@validator("path", pre=True)
def _validate_path(cls, path, values):
connection = values.get("connection")
if isinstance(connection, SparkFileDFConnection):
return connection.path_from_string(path)
return path
|