Skip to content

Запись в Kafka

Для записи данных в Kafka используйте DBWriter с определенными параметрами (см. ниже).

Схема DataFrame

В отличие от других подключений к БД, Kafka не имеет концепции столбцов. Все сообщения в топиках имеют одинаковый набор полей. Записать можно только некоторые из них:

```text
    root
    |-- key: binary (nullable = true)
    |-- value: binary (nullable = true)
    |-- headers: struct (nullable = true)
        |-- key: string (nullable = false)
        |-- value: binary (nullable = true)
```

Поле headers может быть передано только с Kafka.WriteOptions(include_headers=True) (для совместимости с Kafka 1.x).

Поле topic не должно присутствовать в DataFrame, так как оно передается в DBWriter(target=...).

Другие поля, такие как partition, offset, timestamp, устанавливаются Kafka и не могут быть явно переданы.

Сериализация значений

Для записи поля value или key типа, отличного от байтов (например, структуры или целого числа), пользователи должны сериализовать значения вручную.

Это можно сделать с помощью следующих методов:

  • Avro.serialize_column
  • JSON.serialize_column
  • CSV.serialize_column

Примеры

Преобразование value в строку JSON и запись в Kafka:

```python
    from onetl.connection import Kafka
    from onetl.db import DBWriter
    from onetl.file.format import JSON

    df = ...  # исходные данные здесь

    # сериализация структурированных данных в JSON
    json = JSON()
    write_df = df.select(
        df.key,
        json.serialize_column(df.value),
    )

    # запись данных в Kafka
    kafka = Kafka(...)

    writer = DBWriter(
        connection=kafka,
        target="topic_name",
    )
    writer.run(write_df)
```

Параметры

onetl.connection.db_connection.kafka.options.KafkaWriteOptions

Bases: GenericOptions

Writing options for Kafka connector.

.. warning::

Options:
    * ``kafka.*``
    * ``topic``

are populated from connection attributes, and cannot be overridden by the user in ``WriteOptions`` to avoid issues.

.. versionadded:: 0.9.0

Examples

.. note ::

You can pass any value
`supported by connector <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

The set of supported options depends on connector version.

.. code:: python

from onetl.connection import Kafka

options = Kafka.WriteOptions(
    if_exists="append",
    includeHeaders=True,
)
Source code in onetl/connection/db_connection/kafka/options.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class KafkaWriteOptions(GenericOptions):
    """Writing options for Kafka connector.

    .. warning::

        Options:
            * ``kafka.*``
            * ``topic``

        are populated from connection attributes, and cannot be overridden by the user in ``WriteOptions`` to avoid issues.

    .. versionadded:: 0.9.0

    Examples
    --------

    .. note ::

        You can pass any value
        `supported by connector <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_,
        even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

        The set of supported options depends on connector version.

    .. code:: python

        from onetl.connection import Kafka

        options = Kafka.WriteOptions(
            if_exists="append",
            includeHeaders=True,
        )
    """

    if_exists: KafkaTopicExistBehaviorKafka = Field(default=KafkaTopicExistBehaviorKafka.APPEND)
    """Behavior of writing data into existing topic.

    Same as ``df.write.mode(...)``.

    Possible values:
        * ``append`` (default) - Adds new objects into existing topic.
        * ``error`` - Raises an error if topic already exists.
    """

    include_headers: bool = Field(default=False, alias="includeHeaders")
    """
    If ``True``, ``headers`` column from dataframe can be written to Kafka (requires Kafka 2.0+).

    If ``False`` and dataframe contains ``headers`` column, an exception will be raised.
    """

    class Config:
        prohibited_options = PROHIBITED_OPTIONS | KNOWN_READ_OPTIONS
        known_options: frozenset[str] = frozenset()
        extra = "allow"

    @root_validator(pre=True)
    def _mode_is_restricted(cls, values):
        if "mode" in values:
            raise ValueError("Parameter `mode` is not allowed. Please use `if_exists` parameter instead.")
        return values

if_exists = Field(default=(KafkaTopicExistBehaviorKafka.APPEND)) class-attribute instance-attribute

Behavior of writing data into existing topic.

Same as df.write.mode(...).

Possible values
  • append (default) - Adds new objects into existing topic.
  • error - Raises an error if topic already exists.

include_headers = Field(default=False, alias='includeHeaders') class-attribute instance-attribute

If True, headers column from dataframe can be written to Kafka (requires Kafka 2.0+).

If False and dataframe contains headers column, an exception will be raised.

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised

Source code in onetl/impl/generic_options.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def parse(
    cls: type[T],
    options: GenericOptions | dict | None,
) -> T:
    """
    If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
    If a Dict object was passed it will be converted to ReadOptions.

    Otherwise, an exception will be raised
    """

    if not options:
        return cls()

    if isinstance(options, dict):
        return cls.parse_obj(options)

    if not isinstance(options, cls):
        raise TypeError(
            f"{options.__class__.__name__} is not a {cls.__name__} instance",
        )

    return options