Skip to content

REST каталог

onetl.connection.db_connection.iceberg.catalog.rest.IcebergRESTCatalog

Bases: IcebergCatalog, FrozenModel

Iceberg REST Catalog.

Added in 0.15.0

Parameters

url : str REST catalog server URL

dict[str, str], optional

Additional HTTP headers to include in requests

dict[str, str], optional

Additional configuration parameters

IcebergRESTCatalogAuth, optional

Authentication configuration

Examples
from onetl.connection import Iceberg

catalog = Iceberg.RESTCatalog(
    url="https://rest.domain.com:8080",
    auth=Iceberg.RESTCatalog.BasicAuth(
        user="my_user",
        password="my_password",
    ),
)
from onetl.connection import Iceberg

catalog = Iceberg.RESTCatalog(
    url="https://rest.domain.com:8080",
    auth=Iceberg.RESTCatalog.BearerAuth(
        access_token="my_bearer_token",
    ),
)
from onetl.connection import Iceberg

catalog = Iceberg.RESTCatalog(
    url="https://rest.domain.com:8080",
    auth=Iceberg.RESTCatalog.OAuth2ClientCredentials(
        client_id="my_client_id",
        client_secret="my_client_secret",
    ),
)
from onetl.connection import Iceberg

catalog = Iceberg.RESTCatalog(
    url="https://rest.domain.com:8080",
    headers={
        "X-Custom-Auth": "my_custom_token",
        "X-Request-ID": "request-123",
    },
    extra={
        "timeout": "30s",
        "retry": "3",
    },
)

"""
These options will be passed to Spark config:
spark.sql.my_catalog.uri = "https://rest.domain.com:8080"
spark.sql.my_catalog.header.X-Custom-Auth = "my_custom_token"
spark.sql.my_catalog.header.X-Request-ID = "request-123"
spark.sql.my_catalog.timeout = "30s"
spark.sql.my_catalog.retry = "3"
"""
Source code in onetl/connection/db_connection/iceberg/catalog/rest.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class IcebergRESTCatalog(IcebergCatalog, FrozenModel):
    """Iceberg REST Catalog.

    !!! success "Added in 0.15.0"

    Parameters
    ----------
    url : str
        REST catalog server URL

    headers : dict[str, str], optional
        Additional HTTP headers to include in requests

    extra : dict[str, str], optional
        Additional configuration parameters

    auth : IcebergRESTCatalogAuth, optional
        Authentication configuration

    Examples
    --------

    === "REST catalog with basic authentication"
        ```python
        from onetl.connection import Iceberg

        catalog = Iceberg.RESTCatalog(
            url="https://rest.domain.com:8080",
            auth=Iceberg.RESTCatalog.BasicAuth(
                user="my_user",
                password="my_password",
            ),
        )
        ```
    === "REST catalog with bearer token"
        ```python
        from onetl.connection import Iceberg

        catalog = Iceberg.RESTCatalog(
            url="https://rest.domain.com:8080",
            auth=Iceberg.RESTCatalog.BearerAuth(
                access_token="my_bearer_token",
            ),
        )
        ```
    === "REST catalog with OAuth2 Client Credentials Flow"
        ```python
        from onetl.connection import Iceberg

        catalog = Iceberg.RESTCatalog(
            url="https://rest.domain.com:8080",
            auth=Iceberg.RESTCatalog.OAuth2ClientCredentials(
                client_id="my_client_id",
                client_secret="my_client_secret",
            ),
        )
        ```
    === "REST catalog with custom auth"
        ```python
        from onetl.connection import Iceberg

        catalog = Iceberg.RESTCatalog(
            url="https://rest.domain.com:8080",
            headers={
                "X-Custom-Auth": "my_custom_token",
                "X-Request-ID": "request-123",
            },
            extra={
                "timeout": "30s",
                "retry": "3",
            },
        )

        \"\"\"
        These options will be passed to Spark config:
        spark.sql.my_catalog.uri = "https://rest.domain.com:8080"
        spark.sql.my_catalog.header.X-Custom-Auth = "my_custom_token"
        spark.sql.my_catalog.header.X-Request-ID = "request-123"
        spark.sql.my_catalog.timeout = "30s"
        spark.sql.my_catalog.retry = "3"
        \"\"\"
        ```
    """

    BasicAuth = IcebergRESTCatalogBasicAuth
    BearerAuth = IcebergRESTCatalogBearerAuth
    OAuth2ClientCredentials = IcebergRESTCatalogOAuth2ClientCredentials

    url: AnyUrl
    headers: Dict[str, Any] = Field(default_factory=dict)
    extra: Dict[str, Any] = Field(default_factory=dict)

    auth: Optional[IcebergRESTCatalogAuth] = None

    def get_config(self) -> Dict[str, str]:
        config = {
            "type": "rest",
            "uri": str(self.url),
            **stringify(self.extra),
        }
        for key, value in self.headers.items():
            config[f"header.{key}"] = stringify(value)

        if self.auth:
            config.update(self.auth.get_config())

        return config

Аутентификация