Skip to content

Kafka PlaintextProtocol

onetl.connection.db_connection.kafka.kafka_plaintext_protocol.KafkaPlaintextProtocol

Bases: KafkaProtocol, FrozenModel

Connect to Kafka using PLAINTEXT or SASL_PLAINTEXT security protocols.

.. warning::

Not recommended to use on production environments.
Prefer :obj:`SSLProtocol <onetl.connection.db_connection.kafka.kafka_ssl_protocol.KafkaSSLProtocol>`.

.. versionadded:: 0.9.0

Examples

.. code:: python

# No options
protocol = Kafka.PlaintextProtocol()
Source code in onetl/connection/db_connection/kafka/kafka_plaintext_protocol.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class KafkaPlaintextProtocol(KafkaProtocol, FrozenModel):
    """
    Connect to Kafka using ``PLAINTEXT`` or ``SASL_PLAINTEXT`` security protocols.

    .. warning::

        Not recommended to use on production environments.
        Prefer :obj:`SSLProtocol <onetl.connection.db_connection.kafka.kafka_ssl_protocol.KafkaSSLProtocol>`.

    .. versionadded:: 0.9.0

    Examples
    --------

    .. code:: python

        # No options
        protocol = Kafka.PlaintextProtocol()
    """

    def get_options(self, kafka: Kafka) -> dict:
        # Access to Kafka is needed to select the type of protocol depending on the authentication scheme.
        if kafka.auth:
            return {"security.protocol": "SASL_PLAINTEXT"}
        return {"security.protocol": "PLAINTEXT"}

    def cleanup(self, kafka: Kafka) -> None:
        # nothing to cleanup
        pass