Skip to content

Настройки Kafka

Bases: BaseModel

Data.Rentgen consumer Kafka-specific settings.

These options are passed directly to AIOKafkaConsumer <https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer>_.

Examples

.. code-block:: bash

DATA_RENTGEN__KAFKA__BOOTSTRAP_SERVERS="localhost:9092"
DATA_RENTGEN__KAFKA__SECURITY__TYPE=SCRAM-SHA-256
DATA_RENTGEN__KAFKA__REQUEST_TIMEOUT_MS=5000
DATA_RENTGEN__KAFKA__CONNECTIONS_MAX_IDLE_MS=540000
Source code in data_rentgen/consumer/settings/kafka.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class KafkaSettings(BaseModel):
    """Data.Rentgen consumer Kafka-specific settings.

    These options are passed directly to
    `AIOKafkaConsumer <https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer>`_.

    Examples
    --------

    .. code-block:: bash

        DATA_RENTGEN__KAFKA__BOOTSTRAP_SERVERS="localhost:9092"
        DATA_RENTGEN__KAFKA__SECURITY__TYPE=SCRAM-SHA-256
        DATA_RENTGEN__KAFKA__REQUEST_TIMEOUT_MS=5000
        DATA_RENTGEN__KAFKA__CONNECTIONS_MAX_IDLE_MS=540000
    """

    bootstrap_servers: list[str] = Field(
        description="List of Kafka bootstrap servers.",
        min_length=1,
    )
    security: KafkaSecuritySettings = Field(
        default_factory=KafkaSecurityAnonymousSettings,
        description="Kafka security settings.",
    )
    compression: KafkaCompression | None = Field(
        default=None,
        description="Kafka message compression type.",
    )
    # Defaults are copied from FastStream: https://github.com/airtai/faststream/blob/0.5.33/faststream/kafka/fastapi/fastapi.py#L78
    # But only options, related to consuming messages
    request_timeout_ms: int = Field(
        default=40 * 1000,
        description="Client request timeout in milliseconds.",
    )
    retry_backoff_ms: int = Field(
        default=100,
        description="Milliseconds to backoff when retrying on errors.",
    )
    metadata_max_age_ms: int = Field(
        default=5 * 60 * 1000,
        description=textwrap.dedent(
            """
            The period of time in milliseconds after which we force a refresh of metadata,
            even if we haven't seen any partition leadership changes,
            to proactively discover any new brokers or partitions.
            """,
        ),
    )
    connections_max_idle_ms: int = Field(
        default=9 * 60 * 1000,
        description=textwrap.dedent(
            """
            Close idle connections after the number of milliseconds specified by this config.
            Specifying ``None`` will disable idle checks.
            """,
        ),
    )

    @field_validator("bootstrap_servers", mode="before")
    @classmethod
    def _validate_bootstrap_servers(cls, value: Any):
        if not isinstance(value, str):
            return value
        if "[" in value:
            return json.loads(value)
        return [item.strip() for item in value.split(",")]

Bases: KafkaSecurityBaseSettings

Kafka SCRAM-SHA-256 auth settings.

Examples

.. code-block:: bash

DATA_RENTGEN__KAFKA__SECURITY__TYPE=SCRAM-SHA-256
DATA_RENTGEN__KAFKA__SECURITY__USER=dummy
DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme
Source code in data_rentgen/consumer/settings/security/scram.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class KafkaSecurityScram256Settings(KafkaSecurityBaseSettings):
    """Kafka SCRAM-SHA-256 auth settings.

    Examples
    --------

    .. code-block:: bash

        DATA_RENTGEN__KAFKA__SECURITY__TYPE=SCRAM-SHA-256
        DATA_RENTGEN__KAFKA__SECURITY__USER=dummy
        DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme
    """

    type: Literal["SCRAM-SHA-256"] = "SCRAM-SHA-256"
    user: str = Field(description="Kafka security username")
    password: SecretStr = Field(description="Kafka security password")

    def to_security(self):
        return SASLScram256(
            self.user,
            self.password.get_secret_value(),
        )

Bases: KafkaSecurityBaseSettings

Kafka SCRAM-SHA-512 auth settings.

Examples

.. code-block:: bash

DATA_RENTGEN__KAFKA__SECURITY__TYPE=SCRAM-SHA-512
DATA_RENTGEN__KAFKA__SECURITY__USER=dummy
DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme
Source code in data_rentgen/consumer/settings/security/scram.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class KafkaSecurityScram512Settings(KafkaSecurityBaseSettings):
    """Kafka SCRAM-SHA-512 auth settings.

    Examples
    --------

    .. code-block:: bash

        DATA_RENTGEN__KAFKA__SECURITY__TYPE=SCRAM-SHA-512
        DATA_RENTGEN__KAFKA__SECURITY__USER=dummy
        DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme
    """

    type: Literal["SCRAM-SHA-512"] = "SCRAM-SHA-512"
    user: str = Field(description="Kafka security username")
    password: SecretStr = Field(description="Kafka security password")

    def to_security(self):
        return SASLScram512(
            self.user,
            self.password.get_secret_value(),
        )

Bases: KafkaSecurityBaseSettings

Kafka PLAINTEXT auth settings.

Examples

.. code-block:: bash

DATA_RENTGEN__KAFKA__SECURITY__TYPE=PLAINTEXT
DATA_RENTGEN__KAFKA__SECURITY__USER=dummy
DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme
Source code in data_rentgen/consumer/settings/security/plain.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class KafkaSecurityPlaintextSettings(KafkaSecurityBaseSettings):
    """Kafka PLAINTEXT auth settings.

    Examples
    --------

    .. code-block:: bash

        DATA_RENTGEN__KAFKA__SECURITY__TYPE=PLAINTEXT
        DATA_RENTGEN__KAFKA__SECURITY__USER=dummy
        DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme
    """

    type: Literal["PLAINTEXT"] = "PLAINTEXT"
    user: str = Field(description="Kafka security username")
    password: SecretStr = Field(description="Kafka security password")

    def to_security(self):
        return SASLPlaintext(
            self.user,
            self.password.get_secret_value(),
        )

Bases: KafkaSecurityBaseSettings

Kafka GSSAPI auth settings.

This auth method requires installing data-rentgen[gssapi] extra, and relies on presence of kinit and kdestroy binaries in your operating system.

Examples

Using principal + password for calling kinit:

.. code-block:: bash

DATA_RENTGEN__KAFKA__SECURITY__TYPE=GSSAPI
DATA_RENTGEN__KAFKA__SECURITY__PRINCIPAL=dummy
DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme
DATA_RENTGEN__KAFKA__SECURITY__REALM=MY.REALM.COM

Using principal + keytab for calling kinit:

.. code-block:: bash

DATA_RENTGEN__KAFKA__SECURITY__TYPE=GSSAPI
DATA_RENTGEN__KAFKA__SECURITY__PRINCIPAL=dummy
DATA_RENTGEN__KAFKA__SECURITY__KEYTAB=/etc/security/dummy.keytab
DATA_RENTGEN__KAFKA__SECURITY__REALM=MY.REALM.COM
Source code in data_rentgen/consumer/settings/security/gssapi.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class KafkaSecurityGSSAPISettings(KafkaSecurityBaseSettings):
    """Kafka GSSAPI auth settings.

    This auth method requires installing ``data-rentgen[gssapi]`` extra,
    and relies on presence of ``kinit`` and ``kdestroy`` binaries in your operating system.

    Examples
    --------

    Using principal + password for calling ``kinit``:

    .. code-block:: bash

        DATA_RENTGEN__KAFKA__SECURITY__TYPE=GSSAPI
        DATA_RENTGEN__KAFKA__SECURITY__PRINCIPAL=dummy
        DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme
        DATA_RENTGEN__KAFKA__SECURITY__REALM=MY.REALM.COM

    Using principal + keytab for calling ``kinit``:

    .. code-block:: bash

        DATA_RENTGEN__KAFKA__SECURITY__TYPE=GSSAPI
        DATA_RENTGEN__KAFKA__SECURITY__PRINCIPAL=dummy
        DATA_RENTGEN__KAFKA__SECURITY__KEYTAB=/etc/security/dummy.keytab
        DATA_RENTGEN__KAFKA__SECURITY__REALM=MY.REALM.COM
    """

    type: Literal["GSSAPI"] = "GSSAPI"
    principal: str = Field(description="Kerberos principal")
    password: SecretStr | None = Field(description="Kerberos password. Mutually exclusive with keytab")
    keytab: FilePath | None = Field(default=None, description="Path to keytab file. Mutually exclusive with password")
    realm: str | None = Field(default=None, description="Kerberos realm")
    service_name: str = Field(default="kafka", description="Kerberos service name")
    refresh_interval_seconds: int = Field(default=60 * 60, description="Refresh Kerberos ticket every N seconds")

    @model_validator(mode="after")
    def _check_password_or_keytab(self):
        if not (self.password or self.keytab):
            msg = "input should contain either 'password' or 'keytab' field, both are empty"
            raise ValueError(msg)
        if self.password and self.keytab:
            msg = "input should contain either 'password' or 'keytab' field, both are set"
            raise ValueError(msg)
        return self

    def to_security(self):
        return SASLGSSAPI()

    def extra_broker_kwargs(self) -> dict[str, Any]:
        return {
            "sasl_kerberos_service_name": self.service_name,
            "sasl_kerberos_domain_name": self.realm,
        }

    async def kinit_password(self):
        cmd = ["kinit", self.principal]
        logger.debug("Calling command: %s", " ".join(cmd))

        process = await asyncio.create_subprocess_exec(
            *cmd,
            stdin=asyncio.subprocess.PIPE,
            # do not show user 'Please enter password' banner
            stdout=asyncio.subprocess.PIPE,
            # do not capture stderr, immediately show all errors to user
        )
        password: str = self.password.get_secret_value()  # type: ignore[assignment]
        stdout, stderr = await process.communicate(password.encode("utf-8"))
        retcode = await process.wait()
        if retcode:
            raise subprocess.CalledProcessError(retcode, cmd, stdout, stderr)

    async def kinit_keytab(self):
        keytab: Path = self.keytab.resolve()  # type: ignore[assignment]
        cmd = ["kinit", self.principal, "-k", "-t", os.fspath(keytab)]
        logger.debug("Calling command: %s", " ".join(cmd))

        process = await asyncio.create_subprocess_exec(*cmd)
        retcode = await process.wait()
        if retcode:
            raise subprocess.CalledProcessError(retcode, cmd)

    async def kinit_refresh(self):
        cmd = ["kinit", "-R", self.principal]
        logger.debug("Calling command: %s", " ".join(cmd))

        process = await asyncio.create_subprocess_exec(*cmd)
        retcode = await process.wait()
        if retcode:
            raise subprocess.CalledProcessError(retcode, cmd)

    async def kdestroy(self):
        cmd = ["kdestroy", "-p", self.principal]
        logger.debug("Calling command: %s", " ".join(cmd))

        process = await asyncio.create_subprocess_exec(*cmd)
        retcode = await process.wait()
        if retcode:
            raise subprocess.CalledProcessError(retcode, cmd)

    async def initialize(self) -> None:
        if self.keytab:
            await self.kinit_keytab()
        elif self.password:
            await self.kinit_password()

    async def refresh(self) -> None:
        while True:
            await anyio.sleep(self.refresh_interval_seconds)

            with suppress(Exception):
                # if ticket is renewable, try to refresh it
                await self.kinit_refresh()
                continue

            await self.initialize()

    async def destroy(self) -> None:
        with suppress(Exception):
            await self.kdestroy()

Bases: KafkaSecurityBaseSettings

Kafka anonymous auth settings.

Examples

.. code-block:: bash

DATA_RENTGEN__KAFKA__SECURITY__TYPE=None
Source code in data_rentgen/consumer/settings/security/anonymous.py
 8
 9
10
11
12
13
14
15
16
17
18
19
class KafkaSecurityAnonymousSettings(KafkaSecurityBaseSettings):
    """Kafka anonymous auth settings.

    Examples
    --------

    .. code-block:: bash

        DATA_RENTGEN__KAFKA__SECURITY__TYPE=None
    """

    type: Literal[None] = None  # noqa: PYI061