Слоты Kafka
Kafka slots that could be implemented by third-party plugins
.. versionadded:: 0.9.0
Source code in onetl/connection/db_connection/kafka/slots.py
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | |
get_cluster_addresses(cluster)
staticmethod
Retrieve a collection of known broker addresses for the specified Kafka cluster.
This can be used to obtain the broker addresses dynamically.
.. versionadded:: 0.9.0
Parameters
cluster : str The Kafka cluster name.
Returns
list[str] | None
A collection of broker addresses for the specified Kafka cluster. If the hook cannot be applied, return None.
Examples
.. code:: python
from onetl.connection import Kafka
from onetl.hooks import hook
@Kafka.Slots.get_cluster_addresses.bind
@hook
def get_cluster_addresses(cluster: str) -> list[str] | None:
if cluster == "kafka_cluster":
return ["192.168.1.1:9092", "192.168.1.2:9092", "192.168.1.3:9092"]
return None
Source code in onetl/connection/db_connection/kafka/slots.py
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | |
get_known_clusters()
staticmethod
Retrieve the collection of known Kafka clusters.
This can be used to validate if the provided Kafka cluster name is recognized in the system.
.. versionadded:: 0.9.0
Returns
set[str] | None
A collection of known Kafka cluster names. If the hook cannot be applied, return None.
Examples
.. code:: python
from onetl.connection import Kafka
from onetl.hooks import hook
@Kafka.Slots.get_known_clusters.bind
@hook
def get_known_clusters() -> set[str] | None:
return {"kafka-cluster", "local"}
Source code in onetl/connection/db_connection/kafka/slots.py
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | |
normalize_address(address, cluster)
staticmethod
Normalize the given broker address for a specific Kafka cluster.
This can be used to format the broker address according to specific rules, such as adding default ports.
.. versionadded:: 0.9.0
Parameters
address : str The original broker address. cluster : str The Kafka cluster name for which the address should be normalized.
Returns
str | None
The normalized broker address. If the hook cannot be applied to the specific address, return None.
Examples
.. code:: python
from onetl.connection import Kafka
from onetl.hooks import hook
@Kafka.Slots.normalize_address.bind
@hook
def normalize_address(address: str, cluster: str) -> str | None:
if cluster == "kafka-cluster" and ":" not in address:
return f"{address}:9092"
return None
Source code in onetl/connection/db_connection/kafka/slots.py
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 | |
normalize_cluster_name(cluster)
staticmethod
Normalize the given Kafka cluster name.
This can be used to ensure that the Kafka cluster name conforms to specific naming conventions.
.. versionadded:: 0.9.0
Parameters
cluster : str The original Kafka cluster name.
Returns
str | None
The normalized Kafka cluster name. If the hook cannot be applied, return None.
Examples
.. code:: python
from onetl.connection import Kafka
from onetl.hooks import hook
@Kafka.Slots.normalize_cluster_name.bind
@hook
def normalize_cluster_name(cluster: str) -> str | None:
return cluster.lower()
Source code in onetl/connection/db_connection/kafka/slots.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | |