Skip to content

Поток учетных данных клиента OAuth2

Bases: IcebergRESTCatalogAuth, FrozenModel

OAuth2 Client Credentials Flow authentication for Iceberg REST Catalog.

While creating new REST catalog session, new access token is fetched via OAuth2 server HTTP endpoint with grant_type=client_credentials.

After that, all requests to REST catalog are made with a HTTP header Authorization: Bearer {access_token}.

Added in 0.15.0

Parameters

client_secret : str OAuth2 client secret.

str, optional

OAuth2 client ID. In most OAuth2 server implementations it is mandatory.

timedelta, optional

Interval for automatic token refresh. Default: 1 hour. Set to None to disable automatic refresh.

str, optional

OAuth2 endpoint for fetching tokens. If not provided, uses the REST catalog's v1/oauth/tokens endpoint.

List[str], default: []

OAuth2 scopes to request.

str, optional

OAuth2 audience param.

str, optional

OAuth2 resource param.

Examples

from onetl.connection import Iceberg

auth = Iceberg.RESTCatalog.OAuth2ClientCredentials(
    client_id="my_client_id",
    client_secret="my_client_secret",
)
from datetime import timedelta
from onetl.connection import Iceberg

auth = Iceberg.RESTCatalog.OAuth2ClientCredentials(
    client_id="my_client_id",
    client_secret="my_client_secret",
    scopes=["catalog:read"],
    oauth2_token_endpoint="http://keycloak.domain.com/realms/my-realm/protocol/openid-connect/token",
    token_refresh_interval=timedelta(minutes=30),
    audience="iceberg-catalog",
)
Source code in onetl/connection/db_connection/iceberg/catalog/auth/oauth2_client_credentials.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class IcebergRESTCatalogOAuth2ClientCredentials(IcebergRESTCatalogAuth, FrozenModel):
    """OAuth2 Client Credentials Flow authentication for Iceberg REST Catalog.

    While creating new REST catalog session, new access token is fetched via OAuth2 server HTTP endpoint
    with [grant_type=client_credentials](https://www.oauth.com/oauth2-servers/access-tokens/client-credentials/).

    After that, all requests to REST catalog are made with a HTTP header `Authorization: Bearer {access_token}`.

    !!! success "Added in 0.15.0"

    Parameters
    ----------
    client_secret : str
        OAuth2 client secret.

    client_id : str, optional
        OAuth2 client ID. In most OAuth2 server implementations it is [mandatory](https://www.oauth.com/oauth2-servers/client-registration/client-id-secret/).

    token_refresh_interval : timedelta, optional
        Interval for [automatic token refresh](https://www.oauth.com/oauth2-servers/access-tokens/refreshing-access-tokens/).
        Default: 1 hour. Set to `None` to disable automatic refresh.

    oauth2_token_endpoint : str, optional
        OAuth2 endpoint for fetching tokens. If not provided, uses the REST catalog's
        `v1/oauth/tokens` endpoint.

    scopes : List[str], default: []
        [OAuth2 scopes](https://www.oauth.com/oauth2-servers/scope/) to request.

    audience : str, optional
        OAuth2 `audience` param.

    resource : str, optional
        OAuth2 `resource` param.

    Examples
    --------

    === "OAuth2"
        ```python
        from onetl.connection import Iceberg

        auth = Iceberg.RESTCatalog.OAuth2ClientCredentials(
            client_id="my_client_id",
            client_secret="my_client_secret",
        )
        ```
    === "OAuth2 with optional fields"
        ```python
        from datetime import timedelta
        from onetl.connection import Iceberg

        auth = Iceberg.RESTCatalog.OAuth2ClientCredentials(
            client_id="my_client_id",
            client_secret="my_client_secret",
            scopes=["catalog:read"],
            oauth2_token_endpoint="http://keycloak.domain.com/realms/my-realm/protocol/openid-connect/token",
            token_refresh_interval=timedelta(minutes=30),
            audience="iceberg-catalog",
        )
        ```
    """

    # https://github.com/apache/iceberg/blob/720ef99720a1c59e4670db983c951243dffc4f3e/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java#L81-L95
    # https://github.com/apache/iceberg/blob/720ef99720a1c59e4670db983c951243dffc4f3e/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java#L641
    # https://github.com/apache/iceberg/blob/720ef99720a1c59e4670db983c951243dffc4f3e/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java#L277-L300
    # https://github.com/apache/iceberg/blob/720ef99720a1c59e4670db983c951243dffc4f3e/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java#L188-L190

    # https://github.com/apache/iceberg/blob/720ef99720a1c59e4670db983c951243dffc4f3e/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java#L49-L56
    # https://github.com/apache/iceberg/blob/720ef99720a1c59e4670db983c951243dffc4f3e/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java#L366
    # https://github.com/apache/iceberg/blob/720ef99720a1c59e4670db983c951243dffc4f3e/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java#L389-L404
    client_secret: SecretStr
    client_id: Optional[str] = None

    # https://github.com/apache/iceberg/blob/720ef99720a1c59e4670db983c951243dffc4f3e/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java#L33-L39C58
    token_refresh_interval: Optional[timedelta] = timedelta(hours=1)

    # by default uses v1/oauth/tokens endpoint of RESTCatalog server
    # https://github.com/apache/iceberg/blob/720ef99720a1c59e4670db983c951243dffc4f3e/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java#L30-L31C30
    # https://github.com/apache/iceberg/blob/720ef99720a1c59e4670db983c951243dffc4f3e/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Manager.java#L275-L293
    # https://github.com/apache/iceberg/blob/720ef99720a1c59e4670db983c951243dffc4f3e/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java#L57-L59
    oauth2_token_endpoint: Optional[AnyUrl] = None

    scopes: List[str] = Field(default_factory=list)
    audience: Optional[str] = None
    resource: Optional[str] = None

    def get_config(self) -> dict[str, str]:
        config = {
            "rest.auth.type": "oauth2",
            "token-exchange-enabled": "false",
            "credential": (
                f"{self.client_id}:{self.client_secret.get_secret_value()}"
                if self.client_id is not None
                else self.client_secret.get_secret_value()
            ),
            "token-expires-in-ms": (
                str(int(self.token_refresh_interval.total_seconds() * 1000)) if self.token_refresh_interval else None
            ),
            "token-refresh-enabled": stringify(self.token_refresh_interval is not None),
            "oauth2-server-uri": str(self.oauth2_token_endpoint) if self.oauth2_token_endpoint else None,
            "scope": " ".join(self.scopes) if self.scopes else None,
            "audience": self.audience,
            "resource": self.resource,
        }
        return {k: v for k, v in config.items() if v is not None}