Skip to content

Чтение из Kafka

Данные могут быть прочитаны из Kafka в Spark с использованием DBReader. Также поддерживается стратегия для инкрементального чтения данных.

Поддерживаемые функции DBReader

  • columns (не поддерживается Kafka)
  • where (не поддерживается Kafka)
  • ✅︎ hwm, поддерживаемые стратегии:
  • ✅︎ Snapshot
  • ✅︎ Incremental
  • Snapshot batch
  • Incremental batch
  • hint (не поддерживается Kafka)
  • df_schema (см. примечание ниже)
  • ✅︎ options (см. Kafka.ReadOptions)

Схема DataFrame

В отличие от других подключений к БД, Kafka не имеет концепции столбцов. Все сообщения топиков имеют одинаковый набор полей, структура представлена ниже:

```text
    root
    |-- key: binary (nullable = true)
    |-- value: binary (nullable = true)
    |-- topic: string (nullable = false)
    |-- partition: integer (nullable = false)
    |-- offset: integer (nullable = false)
    |-- timestamp: timestamp (nullable = false)
    |-- timestampType: integer (nullable = false)
    |-- headers: struct (nullable = true)
        |-- key: string (nullable = false)
        |-- value: binary (nullable = true)
```

Поле headers присутствует в DataFrame только если передан параметр Kafka.ReadOptions(include_headers=True) (совместимость с Kafka 1.x).

Десериализация значений

Чтобы прочитать value или key другого типа, кроме байтов (например, структуру или целое число), пользователи должны десериализовать значения вручную.

Это можно сделать следующими методами:

  • Avro.parse_column
  • JSON.parse_column
  • CSV.parse_column
  • XML.parse_column

Примеры

Стратегия Snapshot, value - двоичные данные Avro:

```python
    from onetl.connection import Kafka
    from onetl.db import DBReader, DBWriter
    from onetl.file.format import Avro
    from pyspark.sql.functions import decode

    # read all topic data from Kafka
    kafka = Kafka(...)
    reader = DBReader(connection=kafka, source="avro_topic")
    read_df = reader.run()

    # parse Avro format to Spark struct
    avro = Avro(
        schema_dict={
            "type": "record",
            "name": "Person",
            "fields": [
                {"name": "name", "type": "string"},
                {"name": "age", "type": "int"},
            ],
        }
    )
    deserialized_df = read_df.select(
        # cast binary key to string
        decode("key", "UTF-8").alias("key"),
        avro.parse_column("value"),
    ) 
```

Инкрементальная стратегия, value - строка JSON:

Note

В настоящее время коннектор Kafka поддерживает только HWM на основе поля offset. Другие поля, такие как timestamp, пока не поддерживаются.

    from onetl.connection import Kafka
    from onetl.db import DBReader, DBWriter
    from onetl.file.format import JSON
    from pyspark.sql.functions import decode

    kafka = Kafka(...)

    # read only new data from Kafka topic
    reader = DBReader(
        connection=kafka,
        source="topic_name",
        hwm=DBReader.AutoDetectHWM(name="kafka_hwm", expression="offset"),
    )

    with IncrementalStrategy():
        read_df = reader.run()

    # parse JSON format to Spark struct
    json = JSON()
    schema = StructType(
        [
            StructField("name", StringType(), nullable=True),
            StructField("age", IntegerType(), nullable=True),
        ],
    )
    deserialized_df = read_df.select(
        # cast binary key to string
        decode("key", "UTF-8").alias("key"),
        json.parse_column("value", json),
    )

Опции

onetl.connection.db_connection.kafka.options.KafkaReadOptions

Bases: GenericOptions

Reading options for Kafka connector.

.. warning::

Options:
    * ``assign``
    * ``endingOffsets``
    * ``endingOffsetsByTimestamp``
    * ``kafka.*``
    * ``startingOffsets``
    * ``startingOffsetsByTimestamp``
    * ``startingTimestamp``
    * ``subscribe``
    * ``subscribePattern``

are populated from connection attributes, and cannot be overridden by the user in ``ReadOptions`` to avoid issues.

.. versionadded:: 0.9.0

Examples

.. note ::

You can pass any value
`supported by connector <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

The set of supported options depends on connector version.

.. code:: python

from onetl.connection import Kafka

options = Kafka.ReadOptions(
    includeHeaders=False,
    minPartitions=50,
)
Source code in onetl/connection/db_connection/kafka/options.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class KafkaReadOptions(GenericOptions):
    """Reading options for Kafka connector.

    .. warning::

        Options:
            * ``assign``
            * ``endingOffsets``
            * ``endingOffsetsByTimestamp``
            * ``kafka.*``
            * ``startingOffsets``
            * ``startingOffsetsByTimestamp``
            * ``startingTimestamp``
            * ``subscribe``
            * ``subscribePattern``

        are populated from connection attributes, and cannot be overridden by the user in ``ReadOptions`` to avoid issues.

    .. versionadded:: 0.9.0

    Examples
    --------

    .. note ::

        You can pass any value
        `supported by connector <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_,
        even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

        The set of supported options depends on connector version.

    .. code:: python

        from onetl.connection import Kafka

        options = Kafka.ReadOptions(
            includeHeaders=False,
            minPartitions=50,
        )
    """

    include_headers: bool = Field(default=False, alias="includeHeaders")
    """
    If ``True``, add ``headers`` column to output DataFrame.

    If ``False``, column will not be added.
    """

    class Config:
        prohibited_options = PROHIBITED_OPTIONS
        known_options = KNOWN_READ_OPTIONS
        extra = "allow"

include_headers = Field(default=False, alias='includeHeaders') class-attribute instance-attribute

If True, add headers column to output DataFrame.

If False, column will not be added.

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised

Source code in onetl/impl/generic_options.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def parse(
    cls: type[T],
    options: GenericOptions | dict | None,
) -> T:
    """
    If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged.
    If a Dict object was passed it will be converted to ReadOptions.

    Otherwise, an exception will be raised
    """

    if not options:
        return cls()

    if isinstance(options, dict):
        return cls.parse_obj(options)

    if not isinstance(options, cls):
        raise TypeError(
            f"{options.__class__.__name__} is not a {cls.__name__} instance",
        )

    return options