Skip to content

Чтение из Kafka

Данные могут быть прочитаны из Kafka в Spark с использованием DBReader. Также поддерживается стратегия для инкрементального чтения данных.

Поддерживаемые функции DBReader

  • columns (не поддерживается Kafka)
  • where (не поддерживается Kafka)
  • ✅︎ hwm, поддерживаемые стратегии:
  • ✅︎ Snapshot
  • ✅︎ Incremental
  • Snapshot batch
  • Incremental batch
  • hint (не поддерживается Kafka)
  • df_schema (см. примечание ниже)
  • ✅︎ options (см. Kafka.ReadOptions)

Схема DataFrame

В отличие от других подключений к БД, Kafka не имеет концепции столбцов. Все сообщения топиков имеют одинаковый набор полей, структура представлена ниже:

    root
    |-- key: binary (nullable = true)
    |-- value: binary (nullable = true)
    |-- topic: string (nullable = false)
    |-- partition: integer (nullable = false)
    |-- offset: integer (nullable = false)
    |-- timestamp: timestamp (nullable = false)
    |-- timestampType: integer (nullable = false)
    |-- headers: struct (nullable = true)
        |-- key: string (nullable = false)
        |-- value: binary (nullable = true)

Поле headers присутствует в DataFrame только если передан параметр Kafka.ReadOptions(include_headers=True) (совместимость с Kafka 1.x).

Десериализация значений

Чтобы прочитать value или key другого типа, кроме байтов (например, структуру или целое число), пользователи должны десериализовать значения вручную.

Это можно сделать следующими методами:

Or any other method provided by Spark or third-larty libraries which can parse BinaryType() column into useful data.

GroupIds and offsets

Regular Kafka consumers use subscrube(topic) method to notify Kafka that some new data from Kafka should be send to consumer if available. Offsets read by group are committed to Kafka, to guarantee at-least-once even if consumer failed somethere.

Spark connector for Kafka is very different. It uses assign(topic) to read data manually from a topic. It doesn't commit offsets to Kafka, as the same data can be read multiple times, e.g. task failed and lost all its memory, new task will read this data again.

Примеры

Стратегия Snapshot, value - двоичные данные Avro:

    from onetl.connection import Kafka
    from onetl.db import DBReader, DBWriter
    from onetl.file.format import Avro
    from pyspark.sql.functions import decode
    # read all topic data from Kafka
    kafka = Kafka(...)
    reader = DBReader(connection=kafka, source="avro_topic")
    read_df = reader.run()
    # parse Avro format to Spark struct
    avro = Avro(
        schema_dict={
            "type": "record",
            "name": "Person",
            "fields": [
                {"name": "name", "type": "string"},
                {"name": "age", "type": "int"},
            ],
        }
    )
    deserialized_df = read_df.select(
        # cast binary key to string
        decode("key", "UTF-8").alias("key"),
        avro.parse_column("value"),
    ) 

Инкрементальная стратегия, value - строка JSON:

Note

В настоящее время коннектор Kafka поддерживает только HWM на основе поля offset. Другие поля, такие как timestamp, пока не поддерживаются.

    from onetl.connection import Kafka
    from onetl.db import DBReader, DBWriter
    from onetl.file.format import JSON
    from pyspark.sql.functions import decode

    kafka = Kafka(...)

    # read only new data from Kafka topic
    reader = DBReader(
        connection=kafka,
        source="topic_name",
        hwm=DBReader.AutoDetectHWM(name="kafka_hwm", expression="offset"),
    )

    with IncrementalStrategy():
        read_df = reader.run()

    # parse JSON format to Spark struct
    json = JSON()
    schema = StructType(
        [
            StructField("name", StringType(), nullable=True),
            StructField("age", IntegerType(), nullable=True),
        ],
    )
    deserialized_df = read_df.select(
        # cast binary key to string
        decode("key", "UTF-8").alias("key"),
        json.parse_column("value", json),
    )

Опции

onetl.connection.db_connection.kafka.options.KafkaReadOptions

Bases: GenericOptions

Reading options for Kafka connector.

Warning

Options: * assign * endingOffsets * endingOffsetsByTimestamp * kafka.* * startingOffsets * startingOffsetsByTimestamp * startingTimestamp * subscribe * subscribePattern

are populated from connection attributes, and cannot be overridden by the user in ReadOptions to avoid issues.

Added in 0.9.0

Examples

Note

You can pass any value supported by connector, even if it is not mentioned in this documentation. Option names should be in camelCase!

The set of supported options depends on connector version.

from onetl.connection import Kafka

options = Kafka.ReadOptions(
    includeHeaders=False,
    minPartitions=50,
)
Source code in onetl/connection/db_connection/kafka/options.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class KafkaReadOptions(GenericOptions):
    """Reading options for Kafka connector.

    !!! warning

        Options:
            * `assign`
            * `endingOffsets`
            * `endingOffsetsByTimestamp`
            * `kafka.*`
            * `startingOffsets`
            * `startingOffsetsByTimestamp`
            * `startingTimestamp`
            * `subscribe`
            * `subscribePattern`

        are populated from connection attributes,
        and cannot be overridden by the user in `ReadOptions` to avoid issues.

    !!! success "Added in 0.9.0"

    Examples
    --------

    !!! note

        You can pass any value
        [supported by connector](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html),
        even if it is not mentioned in this documentation. **Option names should be in** `camelCase`!

        The set of supported options depends on connector version.

    ```python
    from onetl.connection import Kafka

    options = Kafka.ReadOptions(
        includeHeaders=False,
        minPartitions=50,
    )
    ```
    """

    include_headers: bool = Field(default=False, alias="includeHeaders")
    """
    If `True`, add `headers` column to output DataFrame.

    If `False`, column will not be added.
    """

    class Config:
        prohibited_options = PROHIBITED_OPTIONS
        known_options = KNOWN_READ_OPTIONS
        extra = "allow"

include_headers = Field(default=False, alias='includeHeaders') class-attribute instance-attribute

If True, add headers column to output DataFrame.

If False, column will not be added.

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised

Source code in onetl/impl/generic_options.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@classmethod
def parse(
    cls,
    options: GenericOptions | dict | None,
) -> Self:
    """
    If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
    If a Dict object was passed it will be converted to ReadOptions.

    Otherwise, an exception will be raised
    """

    if not options:
        return cls()

    if isinstance(options, dict):
        return cls.parse_obj(options)

    if not isinstance(options, cls):
        msg = f"{options.__class__.__name__} is not a {cls.__name__} instance"
        raise TypeError(msg)

    return options