Skip to content

Слоты Kafka

Kafka slots that could be implemented by third-party plugins

.. versionadded:: 0.9.0

Source code in onetl/connection/db_connection/kafka/slots.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@support_hooks
class KafkaSlots:
    """
    Kafka slots that could be implemented by third-party plugins

    .. versionadded:: 0.9.0
    """

    @slot
    @staticmethod
    def normalize_cluster_name(cluster: str) -> str | None:
        """
        Normalize the given Kafka cluster name.

        This can be used to ensure that the Kafka cluster name conforms to specific naming conventions.

        .. versionadded:: 0.9.0

        Parameters
        ----------
        cluster : str
            The original Kafka cluster name.

        Returns
        -------
        str | None
            The normalized Kafka cluster name. If the hook cannot be applied, return ``None``.

        Examples
        --------

        .. code:: python

            from onetl.connection import Kafka
            from onetl.hooks import hook


            @Kafka.Slots.normalize_cluster_name.bind
            @hook
            def normalize_cluster_name(cluster: str) -> str | None:
                return cluster.lower()
        """

    @slot
    @staticmethod
    def get_known_clusters() -> set[str] | None:
        """
        Retrieve the collection of known Kafka clusters.

        This can be used to validate if the provided Kafka cluster name is recognized in the system.

        .. versionadded:: 0.9.0

        Returns
        -------
        set[str] | None
            A collection of known Kafka cluster names. If the hook cannot be applied, return ``None``.

        Examples
        --------

        .. code:: python

            from onetl.connection import Kafka
            from onetl.hooks import hook


            @Kafka.Slots.get_known_clusters.bind
            @hook
            def get_known_clusters() -> set[str] | None:
                return {"kafka-cluster", "local"}
        """

    @slot
    @staticmethod
    def normalize_address(address: str, cluster: str) -> str | None:
        """
        Normalize the given broker address for a specific Kafka cluster.

        This can be used to format the broker address according to specific rules, such as adding default ports.

        .. versionadded:: 0.9.0

        Parameters
        ----------
        address : str
            The original broker address.
        cluster : str
            The Kafka cluster name for which the address should be normalized.

        Returns
        -------
        str | None
            The normalized broker address. If the hook cannot be applied to the specific address, return ``None``.

        Examples
        --------

        .. code:: python

            from onetl.connection import Kafka
            from onetl.hooks import hook


            @Kafka.Slots.normalize_address.bind
            @hook
            def normalize_address(address: str, cluster: str) -> str | None:
                if cluster == "kafka-cluster" and ":" not in address:
                    return f"{address}:9092"
                return None
        """

    @slot
    @staticmethod
    def get_cluster_addresses(cluster: str) -> list[str] | None:
        """
        Retrieve a collection of known broker addresses for the specified Kafka cluster.

        This can be used to obtain the broker addresses dynamically.

        .. versionadded:: 0.9.0

        Parameters
        ----------
        cluster : str
            The Kafka cluster name.

        Returns
        -------
        list[str] | None
            A collection of broker addresses for the specified Kafka cluster. If the hook cannot be applied, return ``None``.

        Examples
        --------

        .. code:: python

            from onetl.connection import Kafka
            from onetl.hooks import hook


            @Kafka.Slots.get_cluster_addresses.bind
            @hook
            def get_cluster_addresses(cluster: str) -> list[str] | None:
                if cluster == "kafka_cluster":
                    return ["192.168.1.1:9092", "192.168.1.2:9092", "192.168.1.3:9092"]
                return None
        """

get_cluster_addresses(cluster) staticmethod

Retrieve a collection of known broker addresses for the specified Kafka cluster.

This can be used to obtain the broker addresses dynamically.

.. versionadded:: 0.9.0

Parameters

cluster : str The Kafka cluster name.

Returns

list[str] | None A collection of broker addresses for the specified Kafka cluster. If the hook cannot be applied, return None.

Examples

.. code:: python

from onetl.connection import Kafka
from onetl.hooks import hook


@Kafka.Slots.get_cluster_addresses.bind
@hook
def get_cluster_addresses(cluster: str) -> list[str] | None:
    if cluster == "kafka_cluster":
        return ["192.168.1.1:9092", "192.168.1.2:9092", "192.168.1.3:9092"]
    return None
Source code in onetl/connection/db_connection/kafka/slots.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@slot
@staticmethod
def get_cluster_addresses(cluster: str) -> list[str] | None:
    """
    Retrieve a collection of known broker addresses for the specified Kafka cluster.

    This can be used to obtain the broker addresses dynamically.

    .. versionadded:: 0.9.0

    Parameters
    ----------
    cluster : str
        The Kafka cluster name.

    Returns
    -------
    list[str] | None
        A collection of broker addresses for the specified Kafka cluster. If the hook cannot be applied, return ``None``.

    Examples
    --------

    .. code:: python

        from onetl.connection import Kafka
        from onetl.hooks import hook


        @Kafka.Slots.get_cluster_addresses.bind
        @hook
        def get_cluster_addresses(cluster: str) -> list[str] | None:
            if cluster == "kafka_cluster":
                return ["192.168.1.1:9092", "192.168.1.2:9092", "192.168.1.3:9092"]
            return None
    """

get_known_clusters() staticmethod

Retrieve the collection of known Kafka clusters.

This can be used to validate if the provided Kafka cluster name is recognized in the system.

.. versionadded:: 0.9.0

Returns

set[str] | None A collection of known Kafka cluster names. If the hook cannot be applied, return None.

Examples

.. code:: python

from onetl.connection import Kafka
from onetl.hooks import hook


@Kafka.Slots.get_known_clusters.bind
@hook
def get_known_clusters() -> set[str] | None:
    return {"kafka-cluster", "local"}
Source code in onetl/connection/db_connection/kafka/slots.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@slot
@staticmethod
def get_known_clusters() -> set[str] | None:
    """
    Retrieve the collection of known Kafka clusters.

    This can be used to validate if the provided Kafka cluster name is recognized in the system.

    .. versionadded:: 0.9.0

    Returns
    -------
    set[str] | None
        A collection of known Kafka cluster names. If the hook cannot be applied, return ``None``.

    Examples
    --------

    .. code:: python

        from onetl.connection import Kafka
        from onetl.hooks import hook


        @Kafka.Slots.get_known_clusters.bind
        @hook
        def get_known_clusters() -> set[str] | None:
            return {"kafka-cluster", "local"}
    """

normalize_address(address, cluster) staticmethod

Normalize the given broker address for a specific Kafka cluster.

This can be used to format the broker address according to specific rules, such as adding default ports.

.. versionadded:: 0.9.0

Parameters

address : str The original broker address. cluster : str The Kafka cluster name for which the address should be normalized.

Returns

str | None The normalized broker address. If the hook cannot be applied to the specific address, return None.

Examples

.. code:: python

from onetl.connection import Kafka
from onetl.hooks import hook


@Kafka.Slots.normalize_address.bind
@hook
def normalize_address(address: str, cluster: str) -> str | None:
    if cluster == "kafka-cluster" and ":" not in address:
        return f"{address}:9092"
    return None
Source code in onetl/connection/db_connection/kafka/slots.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@slot
@staticmethod
def normalize_address(address: str, cluster: str) -> str | None:
    """
    Normalize the given broker address for a specific Kafka cluster.

    This can be used to format the broker address according to specific rules, such as adding default ports.

    .. versionadded:: 0.9.0

    Parameters
    ----------
    address : str
        The original broker address.
    cluster : str
        The Kafka cluster name for which the address should be normalized.

    Returns
    -------
    str | None
        The normalized broker address. If the hook cannot be applied to the specific address, return ``None``.

    Examples
    --------

    .. code:: python

        from onetl.connection import Kafka
        from onetl.hooks import hook


        @Kafka.Slots.normalize_address.bind
        @hook
        def normalize_address(address: str, cluster: str) -> str | None:
            if cluster == "kafka-cluster" and ":" not in address:
                return f"{address}:9092"
            return None
    """

normalize_cluster_name(cluster) staticmethod

Normalize the given Kafka cluster name.

This can be used to ensure that the Kafka cluster name conforms to specific naming conventions.

.. versionadded:: 0.9.0

Parameters

cluster : str The original Kafka cluster name.

Returns

str | None The normalized Kafka cluster name. If the hook cannot be applied, return None.

Examples

.. code:: python

from onetl.connection import Kafka
from onetl.hooks import hook


@Kafka.Slots.normalize_cluster_name.bind
@hook
def normalize_cluster_name(cluster: str) -> str | None:
    return cluster.lower()
Source code in onetl/connection/db_connection/kafka/slots.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@slot
@staticmethod
def normalize_cluster_name(cluster: str) -> str | None:
    """
    Normalize the given Kafka cluster name.

    This can be used to ensure that the Kafka cluster name conforms to specific naming conventions.

    .. versionadded:: 0.9.0

    Parameters
    ----------
    cluster : str
        The original Kafka cluster name.

    Returns
    -------
    str | None
        The normalized Kafka cluster name. If the hook cannot be applied, return ``None``.

    Examples
    --------

    .. code:: python

        from onetl.connection import Kafka
        from onetl.hooks import hook


        @Kafka.Slots.normalize_cluster_name.bind
        @hook
        def normalize_cluster_name(cluster: str) -> str | None:
            return cluster.lower()
    """