Skip to content

Чтение из MongoDB с использованием DBReader

DBReader поддерживает стратегии для инкрементального чтения данных, но не поддерживает пользовательские пайплайны, например, агрегацию.

Warning

Пожалуйста, учитывайте типы данных MongoDB

Поддерживаемые функции DBReader

  • columns (на данный момент читаются все поля документа)
  • ✅︎ where (передается в пайплайн агрегации {"$match": ...})
  • ✅︎ hwm, поддерживаемые стратегии:
  • ✅︎ Snapshot
  • ✅︎ Incremental
  • ✅︎ Snapshot batch
  • ✅︎ Incremental batch
  • Обратите внимание, что поле expression для HWM может быть только именем поля, а не пользовательским выражением
  • ✅︎ hint (см. официальную документацию)
  • ✅︎ df_schema (обязательно)
  • ✅︎ options (см. MongoDB.ReadOptions)

Примеры

Стратегия Snapshot:

    from onetl.connection import MongoDB
    from onetl.db import DBReader
    from pyspark.sql.types import (
        StructType,
        StructField,
        IntegerType,
        StringType,
        TimestampType,
    )
    mongodb = MongoDB(...)
    # обязательно
    df_schema = StructType(
        [
            StructField("_id", StringType()),
            StructField("some", StringType()),
            StructField(
                "field",
                StructType(
                    [
                        StructField("nested", IntegerType()),
                    ],
                ),
            ),
            StructField("updated_dt", TimestampType()),
        ]
    )
    reader = DBReader(
        connection=mongodb,
        source="some_collection",
        df_schema=df_schema,
        where={"field": {"$eq": 123}},
        hint={"field": 1},
        options=MongoDBReadOptions(batchSize=10000),
    )
    df = reader.run()

Стратегия Incremental:

    from onetl.connection import MongoDB
    from onetl.db import DBReader
    from onetl.strategy import IncrementalStrategy
    from pyspark.sql.types import (
        StructType,
        StructField,
        IntegerType,
        StringType,
        TimestampType,
    )
    mongodb = MongoDB(...)
    # обязательно
    df_schema = StructType(
        [
            StructField("_id", StringType()),
            StructField("some", StringType()),
            StructField(
                "field",
                StructType(
                    [
                        StructField("nested", IntegerType()),
                    ],
                ),
            ),
            StructField("updated_dt", TimestampType()),
        ]
    )
    reader = DBReader(
        connection=mongodb,
        source="some_collection",
        df_schema=df_schema,
        where={"field": {"$eq": 123}},
        hint={"field": 1},
        hwm=DBReader.AutoDetectHWM(name="mongodb_hwm", expression="updated_dt"),
        options=MongoDBReadOptions(batchSize=10000),
    )
    with IncrementalStrategy():
        df = reader.run()

Рекомендации

Обратите внимание на значение where

Вместо фильтрации данных на стороне Spark с использованием df.filter(df.column == 'value') передавайте соответствующее условие DBReader(where={"column": {"$eq": "value"}}). Это уменьшает объем данных, передаваемых из MongoDB в Spark, и может повысить производительность запроса. Особенно если есть индексы для столбцов, используемых в условии where.

Параметры чтения

onetl.connection.db_connection.mongodb.options.MongoDBReadOptions

Bases: GenericOptions

Reading options for MongoDB connector.

Warning

Options uri, database, collection, pipeline, hint are populated from connection attributes, and cannot be overridden by the user in ReadOptions to avoid issues.

Added in 0.7.0

Examples

Note

You can pass any value supported by connector, even if it is not mentioned in this documentation. Option names should be in camelCase!

The set of supported options depends on connector version.

from onetl.connection import MongoDB

options = MongoDB.ReadOptions(
    sampleSize=100,
)
Source code in onetl/connection/db_connection/mongodb/options.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class MongoDBReadOptions(GenericOptions):
    """Reading options for MongoDB connector.

    !!! warning

        Options `uri`, `database`, `collection`, `pipeline`, `hint` are populated from connection
        attributes, and cannot be overridden by the user in `ReadOptions` to avoid issues.

    !!! success "Added in 0.7.0"

    Examples
    --------

    !!! note

        You can pass any value
        [supported by connector](https://www.mongodb.com/docs/spark-connector/current/batch-mode/batch-read-config/),
        even if it is not mentioned in this documentation. **Option names should be in** `camelCase`!

        The set of supported options depends on connector version.

    ```python
    from onetl.connection import MongoDB

    options = MongoDB.ReadOptions(
        sampleSize=100,
    )
    ```
    """

    class Config:
        prohibited_options = PROHIBITED_OPTIONS
        known_options = KNOWN_READ_OPTIONS
        extra = "allow"