Запись в Kafka
Для записи данных в Kafka используйте DBWriter с определенными параметрами (см. ниже).
Схема DataFrame
В отличие от других подключений к БД, Kafka не имеет концепции столбцов. Все сообщения в топиках имеют одинаковый набор полей. Записать можно только некоторые из них:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- headers: struct (nullable = true)
|-- key: string (nullable = false)
|-- value: binary (nullable = true)
Поле headers может быть передано только с Kafka.WriteOptions(include_headers=True) (для совместимости с Kafka 1.x).
Поле topic не должно присутствовать в DataFrame, так как оно передается в DBWriter(target=...).
Другие поля, такие как partition, offset, timestamp, устанавливаются Kafka и не могут быть явно переданы.
Сериализация значений
Для записи поля value или key типа, отличного от байтов (например, структуры или целого числа), пользователи должны сериализовать значения вручную.
Это можно сделать с помощью следующих методов:
Примеры
Преобразование value в строку JSON и запись в Kafka:
from onetl.connection import Kafka
from onetl.db import DBWriter
from onetl.file.format import JSON
df = ... # исходные данные здесь
# сериализация структурированных данных в JSON
json = JSON()
write_df = df.select(
df.key,
json.serialize_column(df.value),
)
# запись данных в Kafka
kafka = Kafka(...)
writer = DBWriter(
connection=kafka,
target="topic_name",
)
writer.run(write_df)
Параметры
onetl.connection.db_connection.kafka.options.KafkaWriteOptions
Bases: GenericOptions
Writing options for Kafka connector.
Warning
Options:
* kafka.*
* topic
are populated from connection attributes,
and cannot be overridden by the user in WriteOptions to avoid issues.
Added in 0.9.0
Examples
Note
You can pass any value
supported by connector,
even if it is not mentioned in this documentation. Option names should be in camelCase!
The set of supported options depends on connector version.
from onetl.connection import Kafka
options = Kafka.WriteOptions(
if_exists="append",
includeHeaders=True,
)
Source code in onetl/connection/db_connection/kafka/options.py
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | |
if_exists = Field(default=(KafkaTopicExistBehaviorKafka.APPEND))
class-attribute
instance-attribute
Behavior of writing data into existing topic.
Same as df.write.mode(...).
Possible values
append(default) - Adds new objects into existing topic.error- Raises an error if topic already exists.
include_headers = Field(default=False, alias='includeHeaders')
class-attribute
instance-attribute
If True, headers column from dataframe can be written to Kafka (requires Kafka 2.0+).
If False and dataframe contains headers column, an exception will be raised.
parse(options)
classmethod
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised
Source code in onetl/impl/generic_options.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | |