Чтение из Kafka
Данные могут быть прочитаны из Kafka в Spark с использованием DBReader. Также поддерживается стратегия для инкрементального чтения данных.
Поддерживаемые функции DBReader
- ❌
columns(не поддерживается Kafka) - ❌
where(не поддерживается Kafka) - ✅︎
hwm, поддерживаемые стратегии: - ✅︎ Snapshot
- ✅︎ Incremental
- ❌ Snapshot batch
- ❌ Incremental batch
- ❌
hint(не поддерживается Kafka) - ❌
df_schema(см. примечание ниже) - ✅︎
options(см. Kafka.ReadOptions)
Схема DataFrame
В отличие от других подключений к БД, Kafka не имеет концепции столбцов. Все сообщения топиков имеют одинаковый набор полей, структура представлена ниже:
```text
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = false)
|-- partition: integer (nullable = false)
|-- offset: integer (nullable = false)
|-- timestamp: timestamp (nullable = false)
|-- timestampType: integer (nullable = false)
|-- headers: struct (nullable = true)
|-- key: string (nullable = false)
|-- value: binary (nullable = true)
```
Поле headers присутствует в DataFrame только если передан параметр Kafka.ReadOptions(include_headers=True) (совместимость с Kafka 1.x).
Десериализация значений
Чтобы прочитать value или key другого типа, кроме байтов (например, структуру или целое число), пользователи должны десериализовать значения вручную.
Это можно сделать следующими методами:
Avro.parse_columnJSON.parse_columnCSV.parse_columnXML.parse_column
Примеры
Стратегия Snapshot, value - двоичные данные Avro:
```python
from onetl.connection import Kafka
from onetl.db import DBReader, DBWriter
from onetl.file.format import Avro
from pyspark.sql.functions import decode
# read all topic data from Kafka
kafka = Kafka(...)
reader = DBReader(connection=kafka, source="avro_topic")
read_df = reader.run()
# parse Avro format to Spark struct
avro = Avro(
schema_dict={
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
],
}
)
deserialized_df = read_df.select(
# cast binary key to string
decode("key", "UTF-8").alias("key"),
avro.parse_column("value"),
)
```
Инкрементальная стратегия, value - строка JSON:
Note
В настоящее время коннектор Kafka поддерживает только HWM на основе поля offset. Другие поля, такие как timestamp, пока не поддерживаются.
from onetl.connection import Kafka
from onetl.db import DBReader, DBWriter
from onetl.file.format import JSON
from pyspark.sql.functions import decode
kafka = Kafka(...)
# read only new data from Kafka topic
reader = DBReader(
connection=kafka,
source="topic_name",
hwm=DBReader.AutoDetectHWM(name="kafka_hwm", expression="offset"),
)
with IncrementalStrategy():
read_df = reader.run()
# parse JSON format to Spark struct
json = JSON()
schema = StructType(
[
StructField("name", StringType(), nullable=True),
StructField("age", IntegerType(), nullable=True),
],
)
deserialized_df = read_df.select(
# cast binary key to string
decode("key", "UTF-8").alias("key"),
json.parse_column("value", json),
)
Опции
onetl.connection.db_connection.kafka.options.KafkaReadOptions
Bases: GenericOptions
Reading options for Kafka connector.
.. warning::
Options:
* ``assign``
* ``endingOffsets``
* ``endingOffsetsByTimestamp``
* ``kafka.*``
* ``startingOffsets``
* ``startingOffsetsByTimestamp``
* ``startingTimestamp``
* ``subscribe``
* ``subscribePattern``
are populated from connection attributes, and cannot be overridden by the user in ``ReadOptions`` to avoid issues.
.. versionadded:: 0.9.0
Examples
.. note ::
You can pass any value
`supported by connector <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!
The set of supported options depends on connector version.
.. code:: python
from onetl.connection import Kafka
options = Kafka.ReadOptions(
includeHeaders=False,
minPartitions=50,
)
Source code in onetl/connection/db_connection/kafka/options.py
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | |
include_headers = Field(default=False, alias='includeHeaders')
class-attribute
instance-attribute
If True, add headers column to output DataFrame.
If False, column will not be added.
parse(options)
classmethod
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised
Source code in onetl/impl/generic_options.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | |