Bases: KafkaAuth, GenericOptions
Connect to Kafka using sasl.mechanism="GSSAPI".
For more details see:
Kafka Documentation <https://kafka.apache.org/documentation/#security_sasl_kerberos_clientconfig>_
Krb5LoginModule documentation <https://docs.oracle.com/javase/8/docs/jre/api/security/jaas/spec/com/sun/security/auth/module/Krb5LoginModule.html>_
.. versionadded:: 0.9.0
Examples
Auth in Kafka with keytab, automatically deploy keytab files to all Spark hosts (driver and executors):
.. code:: python
from onetl.connection import Kafka
auth = Kafka.KerberosAuth(
principal="user",
keytab="/path/to/keytab",
deploy_keytab=True,
)
Auth in Kafka with keytab, keytab is already deployed on all Spark hosts (driver and executors):
.. code:: python
from onetl.connection import Kafka
auth = Kafka.KerberosAuth(
principal="user",
keytab="/path/to/keytab",
deploy_keytab=False,
)
Auth in Kafka with existing Kerberos ticket (only Spark session created with master=local):
.. code:: python
from onetl.connection import Kafka
auth = Kafka.KerberosAuth(
principal="user",
use_keytab=False,
use_ticket_cache=True,
)
Pass custom options for JAAS config and Kafka SASL:
.. code:: python
from onetl.connection import Kafka
auth = Kafka.KerberosAuth.parse(
{
"principal": "user",
"keytab": "/path/to/keytab",
# options without sasl.kerberos. prefix are passed to JAAS config
# names are in camel case!
"isInitiator": True,
# options with `sasl.kerberos.` prefix are passed to Kafka client config as-is
"sasl.kerberos.kinit.cmd": "/usr/bin/kinit",
}
)
Source code in onetl/connection/db_connection/kafka/kafka_kerberos_auth.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205 | class KafkaKerberosAuth(KafkaAuth, GenericOptions):
"""
Connect to Kafka using ``sasl.mechanism="GSSAPI"``.
For more details see:
* `Kafka Documentation <https://kafka.apache.org/documentation/#security_sasl_kerberos_clientconfig>`_
* `Krb5LoginModule documentation <https://docs.oracle.com/javase/8/docs/jre/api/security/jaas/spec/com/sun/security/auth/module/Krb5LoginModule.html>`_
.. versionadded:: 0.9.0
Examples
--------
Auth in Kafka with keytab, automatically deploy keytab files to all Spark hosts (driver and executors):
.. code:: python
from onetl.connection import Kafka
auth = Kafka.KerberosAuth(
principal="user",
keytab="/path/to/keytab",
deploy_keytab=True,
)
Auth in Kafka with keytab, keytab is **already deployed** on all Spark hosts (driver and executors):
.. code:: python
from onetl.connection import Kafka
auth = Kafka.KerberosAuth(
principal="user",
keytab="/path/to/keytab",
deploy_keytab=False,
)
Auth in Kafka with existing Kerberos ticket (only Spark session created with ``master=local``):
.. code:: python
from onetl.connection import Kafka
auth = Kafka.KerberosAuth(
principal="user",
use_keytab=False,
use_ticket_cache=True,
)
Pass custom options for JAAS config and Kafka SASL:
.. code:: python
from onetl.connection import Kafka
auth = Kafka.KerberosAuth.parse(
{
"principal": "user",
"keytab": "/path/to/keytab",
# options without sasl.kerberos. prefix are passed to JAAS config
# names are in camel case!
"isInitiator": True,
# options with `sasl.kerberos.` prefix are passed to Kafka client config as-is
"sasl.kerberos.kinit.cmd": "/usr/bin/kinit",
}
)
"""
principal: str
keytab: Optional[LocalPath] = Field(default=None, alias="keyTab")
deploy_keytab: bool = True
service_name: str = Field(default="kafka", alias="serviceName")
renew_ticket: bool = Field(default=True, alias="renewTicket")
store_key: bool = Field(default=True, alias="storeKey")
use_keytab: bool = Field(default=True, alias="useKeyTab")
use_ticket_cache: bool = Field(default=False, alias="useTicketCache")
_keytab_path: Optional[LocalPath] = PrivateAttr(default=None)
class Config:
prohibited_options = PROHIBITED_OPTIONS
known_options = KNOWN_OPTIONS
strip_prefixes = ["kafka."]
extra = "allow"
def get_jaas_conf(self, kafka: Kafka) -> str:
options = self.dict(
by_alias=True,
exclude_none=True,
exclude={"deploy_keytab"},
)
if self.keytab:
options["keyTab"] = self._prepare_keytab(kafka)
jaas_conf = stringify({key: value for key, value in options.items() if not key.startswith("sasl.")}, quote=True)
jaas_conf_items = [f"{key}={value}" for key, value in jaas_conf.items()]
return "com.sun.security.auth.module.Krb5LoginModule required " + " ".join(jaas_conf_items) + ";"
def get_options(self, kafka: Kafka) -> dict:
result = {
key: value for key, value in self.dict(by_alias=True, exclude_none=True).items() if key.startswith("sasl.")
}
result.update(
{
"sasl.mechanism": "GSSAPI",
"sasl.jaas.config": self.get_jaas_conf(kafka),
"sasl.kerberos.service.name": self.service_name,
},
)
return stringify(result)
def cleanup(self, kafka: Kafka) -> None:
if self._keytab_path and self._keytab_path.exists():
log.debug("Removing keytab from %s", path_repr(self._keytab_path))
try:
self._keytab_path.unlink()
except OSError:
log.exception("Failed to remove keytab file '%s'", self._keytab_path)
self._keytab_path = None
@validator("keytab")
def _validate_keytab(cls, value):
return is_file_readable(value)
@root_validator
def _use_keytab(cls, values):
keytab = values.get("keytab")
use_keytab = values.get("use_keytab")
if use_keytab and not keytab:
raise ValueError("keytab is required if useKeytab is True")
return values
def _prepare_keytab(self, kafka: Kafka) -> str:
keytab: LocalPath = self.keytab # type: ignore[assignment]
if not self.deploy_keytab:
return os.fspath(keytab)
self._keytab_path = self._generate_keytab_path(keytab, self.principal)
log.debug("Moving keytab from %s to %s", path_repr(keytab), path_repr(self._keytab_path))
shutil.copy2(keytab, self._keytab_path)
kafka.spark.sparkContext.addFile(os.fspath(self._keytab_path))
return os.fspath(self._keytab_path.name)
@staticmethod
def _generate_keytab_path(keytab: LocalPath, principal: str) -> LocalPath:
# Using hash in keytab name prevents collisions if there are several Kafka instances with
# keytab paths like `/some/kafka.keytab` and `/another/kafka.keytab`.
# Not using random values here to avoid generating too much garbage in current directory.
# Also Spark copies all files to the executor working directory ignoring their paths,
# so if there is some other connector deploy keytab file, these files will have different names.
keytab_hash = get_file_hash(keytab, "md5")
return LocalPath().joinpath(f"kafka_{principal}_{keytab_hash.hexdigest()}.keytab").resolve()
|
parse(options)
classmethod
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised
Source code in onetl/impl/generic_options.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 | @classmethod
def parse(
cls: type[T],
options: GenericOptions | dict | None,
) -> T:
"""
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised
"""
if not options:
return cls()
if isinstance(options, dict):
return cls.parse_obj(options)
if not isinstance(options, cls):
raise TypeError(
f"{options.__class__.__name__} is not a {cls.__name__} instance",
)
return options
|