Чтение из Kafka
Данные могут быть прочитаны из Kafka в Spark с использованием DBReader. Также поддерживается стратегия для инкрементального чтения данных.
Поддерживаемые функции DBReader
- ❌
columns(не поддерживается Kafka) - ❌
where(не поддерживается Kafka) - ✅︎
hwm, поддерживаемые стратегии: - ✅︎ Snapshot
- ✅︎ Incremental
- ❌ Snapshot batch
- ❌ Incremental batch
- ❌
hint(не поддерживается Kafka) - ❌
df_schema(см. примечание ниже) - ✅︎
options(см. Kafka.ReadOptions)
Схема DataFrame
В отличие от других подключений к БД, Kafka не имеет концепции столбцов. Все сообщения топиков имеют одинаковый набор полей, структура представлена ниже:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = false)
|-- partition: integer (nullable = false)
|-- offset: integer (nullable = false)
|-- timestamp: timestamp (nullable = false)
|-- timestampType: integer (nullable = false)
|-- headers: struct (nullable = true)
|-- key: string (nullable = false)
|-- value: binary (nullable = true)
Поле headers присутствует в DataFrame только если передан параметр Kafka.ReadOptions(include_headers=True) (совместимость с Kafka 1.x).
Десериализация значений
Чтобы прочитать value или key другого типа, кроме байтов (например, структуру или целое число), пользователи должны десериализовать значения вручную.
Это можно сделать следующими методами:
Or any other method provided by Spark or third-larty libraries which can parse BinaryType() column into useful data.
GroupIds and offsets
Regular Kafka consumers use subscrube(topic) method to notify Kafka that some new data from Kafka should be send to consumer if available. Offsets read by group are committed to Kafka, to guarantee at-least-once even if consumer failed somethere.
Spark connector for Kafka is very different. It uses assign(topic) to read data manually from a topic. It doesn't commit offsets to Kafka, as the same data can be read multiple times, e.g. task failed and lost all its memory, new task will read this data again.
Примеры
Стратегия Snapshot, value - двоичные данные Avro:
from onetl.connection import Kafka
from onetl.db import DBReader, DBWriter
from onetl.file.format import Avro
from pyspark.sql.functions import decode
# read all topic data from Kafka
kafka = Kafka(...)
reader = DBReader(connection=kafka, source="avro_topic")
read_df = reader.run()
# parse Avro format to Spark struct
avro = Avro(
schema_dict={
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
],
}
)
deserialized_df = read_df.select(
# cast binary key to string
decode("key", "UTF-8").alias("key"),
avro.parse_column("value"),
)
Инкрементальная стратегия, value - строка JSON:
Note
В настоящее время коннектор Kafka поддерживает только HWM на основе поля offset. Другие поля, такие как timestamp, пока не поддерживаются.
from onetl.connection import Kafka
from onetl.db import DBReader, DBWriter
from onetl.file.format import JSON
from pyspark.sql.functions import decode
kafka = Kafka(...)
# read only new data from Kafka topic
reader = DBReader(
connection=kafka,
source="topic_name",
hwm=DBReader.AutoDetectHWM(name="kafka_hwm", expression="offset"),
)
with IncrementalStrategy():
read_df = reader.run()
# parse JSON format to Spark struct
json = JSON()
schema = StructType(
[
StructField("name", StringType(), nullable=True),
StructField("age", IntegerType(), nullable=True),
],
)
deserialized_df = read_df.select(
# cast binary key to string
decode("key", "UTF-8").alias("key"),
json.parse_column("value", json),
)
Опции
onetl.connection.db_connection.kafka.options.KafkaReadOptions
Bases: GenericOptions
Reading options for Kafka connector.
Warning
Options:
* assign
* endingOffsets
* endingOffsetsByTimestamp
* kafka.*
* startingOffsets
* startingOffsetsByTimestamp
* startingTimestamp
* subscribe
* subscribePattern
are populated from connection attributes,
and cannot be overridden by the user in ReadOptions to avoid issues.
Added in 0.9.0
Examples
Note
You can pass any value
supported by connector,
even if it is not mentioned in this documentation. Option names should be in camelCase!
The set of supported options depends on connector version.
from onetl.connection import Kafka
options = Kafka.ReadOptions(
includeHeaders=False,
minPartitions=50,
)
Source code in onetl/connection/db_connection/kafka/options.py
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | |
include_headers = Field(default=False, alias='includeHeaders')
class-attribute
instance-attribute
If True, add headers column to output DataFrame.
If False, column will not be added.
parse(options)
classmethod
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised
Source code in onetl/impl/generic_options.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | |