Bases: IcebergWarehouse, FrozenModel
Iceberg S3 Warehouse. 
Note
This warehouse uses a dedicated Iceberg S3 client to access the warehouse data.
It does not rely on SparkS3 implementation.
Parameters
path : str
Warehouse path
str
S3 endpoint hostname
int, optional
S3 endpoint port
Literal["http", "https"], default: "https"
Protocol to use for S3 connections
str
S3 bucket name
bool, default: False
Whether to use path-style access
str, optional
S3 access key
str, optional
S3 secret key
str, optional
S3 session token for temporary credentials
str
S3 region
Examples
from onetl.connection import Iceberg
warehouse = Iceberg.S3Warehouse(
path="/warehouse",
host="s3.domain.com",
protocol="http",
bucket="my-bucket",
region="us-east-1",
path_style_access=True,
access_key="my_access_key",
secret_key="my_secret_key",
)
Source code in onetl/connection/db_connection/iceberg/warehouse/s3.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150 | @support_hooks
class IcebergS3Warehouse(IcebergWarehouse, FrozenModel):
"""Iceberg S3 Warehouse. [](/hooks/)
!!! success "Added in 0.15.0"
!!! note
This warehouse uses a **dedicated Iceberg S3 client** to access the warehouse data.
It does **not rely on SparkS3 implementation**.
Parameters
----------
path : str
Warehouse path
host : str
S3 endpoint hostname
port : int, optional
S3 endpoint port
protocol : Literal["http", "https"], default: "https"
Protocol to use for S3 connections
bucket : str
S3 bucket name
path_style_access : bool, default: False
Whether to use path-style access
access_key : str, optional
S3 access key
secret_key : str, optional
S3 secret key
session_token : str, optional
S3 session token for temporary credentials
region : str
S3 region
extra : Dict[str, str], default: {}
Additional S3 configuration parameters
Examples
--------
```python
from onetl.connection import Iceberg
warehouse = Iceberg.S3Warehouse(
path="/warehouse",
host="s3.domain.com",
protocol="http",
bucket="my-bucket",
region="us-east-1",
path_style_access=True,
access_key="my_access_key",
secret_key="my_secret_key",
)
```
"""
path: PurePathProtocol
host: str
port: Optional[int] = None
protocol: Literal["http", "https"] = "https"
bucket: str
region: str
path_style_access: bool = False
access_key: Optional[str] = None
secret_key: Optional[SecretStr] = None
session_token: Optional[SecretStr] = None
extra: Dict[str, Any] = Field(default_factory=dict)
@slot
def get_config(self) -> dict[str, str]:
config = {
"warehouse": "s3a://" + self.bucket + "/" + self.path.as_posix().lstrip("/"),
"io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"s3.endpoint": f"{self.protocol}://{self.host}{f':{self.port}' if self.port else ''}",
"s3.access-key-id": self.access_key,
"s3.secret-access-key": self.secret_key.get_secret_value() if self.secret_key else None,
"s3.session-token": self.session_token.get_secret_value() if self.session_token else None,
"s3.path-style-access": stringify(self.path_style_access),
"client.region": self.region,
**stringify(self.extra),
}
return {k: v for k, v in config.items() if v is not None}
@slot
@classmethod
def get_packages(cls, package_version: str) -> List[str]:
"""
Get package names to be downloaded by Spark. [](/hooks/)
See [Maven package index](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle)
for all available packages.
Parameters
----------
package_version : str
Iceberg package version in format `major.minor.patch`.
Returns
-------
list[str]
List of Maven coordinates.
Examples
--------
```python
from onetl.connection import Iceberg
# Note: Iceberg 1.10.0 requires Java 11+
Iceberg.S3Warehouse.get_packages(package_version="1.10.0")
```
"""
version = Version(package_version).min_digits(3)
return [f"org.apache.iceberg:iceberg-aws-bundle:{version}"]
@validator("path", pre=True)
def _validate_path(cls, path, values):
return RemotePath(os.fspath(path))
|
get_packages(package_version)
classmethod
Get package names to be downloaded by Spark. 
See Maven package index
for all available packages.
Parameters
package_version : str
Iceberg package version in format major.minor.patch.
Returns
list[str]
List of Maven coordinates.
Examples
from onetl.connection import Iceberg
# Note: Iceberg 1.10.0 requires Java 11+
Iceberg.S3Warehouse.get_packages(package_version="1.10.0")
Source code in onetl/connection/db_connection/iceberg/warehouse/s3.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 | @slot
@classmethod
def get_packages(cls, package_version: str) -> List[str]:
"""
Get package names to be downloaded by Spark. [](/hooks/)
See [Maven package index](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle)
for all available packages.
Parameters
----------
package_version : str
Iceberg package version in format `major.minor.patch`.
Returns
-------
list[str]
List of Maven coordinates.
Examples
--------
```python
from onetl.connection import Iceberg
# Note: Iceberg 1.10.0 requires Java 11+
Iceberg.S3Warehouse.get_packages(package_version="1.10.0")
```
"""
version = Version(package_version).min_digits(3)
return [f"org.apache.iceberg:iceberg-aws-bundle:{version}"]
|