Запись в Kafka
Для записи данных в Kafka используйте DBWriter с определенными параметрами (см. ниже).
Схема DataFrame
В отличие от других подключений к БД, Kafka не имеет концепции столбцов. Все сообщения в топиках имеют одинаковый набор полей. Записать можно только некоторые из них:
```text
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- headers: struct (nullable = true)
|-- key: string (nullable = false)
|-- value: binary (nullable = true)
```
Поле headers может быть передано только с Kafka.WriteOptions(include_headers=True) (для совместимости с Kafka 1.x).
Поле topic не должно присутствовать в DataFrame, так как оно передается в DBWriter(target=...).
Другие поля, такие как partition, offset, timestamp, устанавливаются Kafka и не могут быть явно переданы.
Сериализация значений
Для записи поля value или key типа, отличного от байтов (например, структуры или целого числа), пользователи должны сериализовать значения вручную.
Это можно сделать с помощью следующих методов:
Avro.serialize_columnJSON.serialize_columnCSV.serialize_column
Примеры
Преобразование value в строку JSON и запись в Kafka:
```python
from onetl.connection import Kafka
from onetl.db import DBWriter
from onetl.file.format import JSON
df = ... # исходные данные здесь
# сериализация структурированных данных в JSON
json = JSON()
write_df = df.select(
df.key,
json.serialize_column(df.value),
)
# запись данных в Kafka
kafka = Kafka(...)
writer = DBWriter(
connection=kafka,
target="topic_name",
)
writer.run(write_df)
```
Параметры
onetl.connection.db_connection.kafka.options.KafkaWriteOptions
Bases: GenericOptions
Writing options for Kafka connector.
.. warning::
Options:
* ``kafka.*``
* ``topic``
are populated from connection attributes, and cannot be overridden by the user in ``WriteOptions`` to avoid issues.
.. versionadded:: 0.9.0
Examples
.. note ::
You can pass any value
`supported by connector <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!
The set of supported options depends on connector version.
.. code:: python
from onetl.connection import Kafka
options = Kafka.WriteOptions(
if_exists="append",
includeHeaders=True,
)
Source code in onetl/connection/db_connection/kafka/options.py
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | |
if_exists = Field(default=(KafkaTopicExistBehaviorKafka.APPEND))
class-attribute
instance-attribute
Behavior of writing data into existing topic.
Same as df.write.mode(...).
Possible values
append(default) - Adds new objects into existing topic.error- Raises an error if topic already exists.
include_headers = Field(default=False, alias='includeHeaders')
class-attribute
instance-attribute
If True, headers column from dataframe can be written to Kafka (requires Kafka 2.0+).
If False and dataframe contains headers column, an exception will be raised.
parse(options)
classmethod
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised
Source code in onetl/impl/generic_options.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | |