Skip to content

S3 хранилище

Bases: IcebergWarehouse, FrozenModel

Iceberg S3 Warehouse. support hooks

Added in 0.15.0

Note

This warehouse uses a dedicated Iceberg S3 client to access the warehouse data. It does not rely on SparkS3 implementation.

Parameters

path : str Warehouse path

str

S3 endpoint hostname

int, optional

S3 endpoint port

Literal["http", "https"], default: "https"

Protocol to use for S3 connections

str

S3 bucket name

bool, default: False

Whether to use path-style access

str, optional

S3 access key

str, optional

S3 secret key

str, optional

S3 session token for temporary credentials

str

S3 region

Dict[str, str], default: {}

Additional S3 configuration parameters

Examples

from onetl.connection import Iceberg

warehouse = Iceberg.S3Warehouse(
    path="/warehouse",
    host="s3.domain.com",
    protocol="http",
    bucket="my-bucket",
    region="us-east-1",
    path_style_access=True,
    access_key="my_access_key",
    secret_key="my_secret_key",
)
Source code in onetl/connection/db_connection/iceberg/warehouse/s3.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@support_hooks
class IcebergS3Warehouse(IcebergWarehouse, FrozenModel):
    """Iceberg S3 Warehouse. [![support hooks](https://img.shields.io/badge/%20-support%20hooks-blue)](/hooks/)

    !!! success "Added in 0.15.0"

    !!! note

        This warehouse uses a **dedicated Iceberg S3 client** to access the warehouse data.
        It does **not rely on SparkS3 implementation**.

    Parameters
    ----------
    path : str
        Warehouse path

    host : str
        S3 endpoint hostname

    port : int, optional
        S3 endpoint port

    protocol : Literal["http", "https"], default: "https"
        Protocol to use for S3 connections

    bucket : str
        S3 bucket name

    path_style_access : bool, default: False
        Whether to use path-style access

    access_key : str, optional
        S3 access key

    secret_key : str, optional
        S3 secret key

    session_token : str, optional
        S3 session token for temporary credentials

    region : str
        S3 region

    extra : Dict[str, str], default: {}
        Additional S3 configuration parameters

    Examples
    --------

    ```python
    from onetl.connection import Iceberg

    warehouse = Iceberg.S3Warehouse(
        path="/warehouse",
        host="s3.domain.com",
        protocol="http",
        bucket="my-bucket",
        region="us-east-1",
        path_style_access=True,
        access_key="my_access_key",
        secret_key="my_secret_key",
    )
    ```
    """

    path: PurePathProtocol
    host: str
    port: Optional[int] = None
    protocol: Literal["http", "https"] = "https"
    bucket: str
    region: str
    path_style_access: bool = False
    access_key: Optional[str] = None
    secret_key: Optional[SecretStr] = None
    session_token: Optional[SecretStr] = None
    extra: Dict[str, Any] = Field(default_factory=dict)

    @slot
    def get_config(self) -> dict[str, str]:
        config = {
            "warehouse": "s3a://" + self.bucket + "/" + self.path.as_posix().lstrip("/"),
            "io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
            "s3.endpoint": f"{self.protocol}://{self.host}{f':{self.port}' if self.port else ''}",
            "s3.access-key-id": self.access_key,
            "s3.secret-access-key": self.secret_key.get_secret_value() if self.secret_key else None,
            "s3.session-token": self.session_token.get_secret_value() if self.session_token else None,
            "s3.path-style-access": stringify(self.path_style_access),
            "client.region": self.region,
            **stringify(self.extra),
        }
        return {k: v for k, v in config.items() if v is not None}

    @slot
    @classmethod
    def get_packages(cls, package_version: str) -> List[str]:
        """
        Get package names to be downloaded by Spark. [![support hooks](https://img.shields.io/badge/%20-support%20hooks-blue)](/hooks/)

        See [Maven package index](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle)
        for all available packages.

        Parameters
        ----------
        package_version : str
            Iceberg package version in format `major.minor.patch`.

        Returns
        -------
        list[str]
            List of Maven coordinates.

        Examples
        --------
        ```python
        from onetl.connection import Iceberg

        # Note: Iceberg 1.10.0 requires Java 11+
        Iceberg.S3Warehouse.get_packages(package_version="1.10.0")
        ```
        """
        version = Version(package_version).min_digits(3)
        return [f"org.apache.iceberg:iceberg-aws-bundle:{version}"]

    @validator("path", pre=True)
    def _validate_path(cls, path, values):
        return RemotePath(os.fspath(path))

get_packages(package_version) classmethod

Get package names to be downloaded by Spark. support hooks

See Maven package index for all available packages.

Parameters

package_version : str Iceberg package version in format major.minor.patch.

Returns

list[str] List of Maven coordinates.

Examples

from onetl.connection import Iceberg

# Note: Iceberg 1.10.0 requires Java 11+
Iceberg.S3Warehouse.get_packages(package_version="1.10.0")
Source code in onetl/connection/db_connection/iceberg/warehouse/s3.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@slot
@classmethod
def get_packages(cls, package_version: str) -> List[str]:
    """
    Get package names to be downloaded by Spark. [![support hooks](https://img.shields.io/badge/%20-support%20hooks-blue)](/hooks/)

    See [Maven package index](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle)
    for all available packages.

    Parameters
    ----------
    package_version : str
        Iceberg package version in format `major.minor.patch`.

    Returns
    -------
    list[str]
        List of Maven coordinates.

    Examples
    --------
    ```python
    from onetl.connection import Iceberg

    # Note: Iceberg 1.10.0 requires Java 11+
    Iceberg.S3Warehouse.get_packages(package_version="1.10.0")
    ```
    """
    version = Version(package_version).min_digits(3)
    return [f"org.apache.iceberg:iceberg-aws-bundle:{version}"]