Skip to content

Чтение из MongoDB с использованием DBReader

DBReader поддерживает стратегии для инкрементального чтения данных, но не поддерживает пользовательские пайплайны, например, агрегацию.

Warning

Пожалуйста, учитывайте типы данных MongoDB

Поддерживаемые функции DBReader

  • columns (на данный момент читаются все поля документа)
  • ✅︎ where (передается в пайплайн агрегации {"$match": ...})
  • ✅︎ hwm, поддерживаемые стратегии:
  • ✅︎ Snapshot
  • ✅︎ Incremental
  • ✅︎ Snapshot batch
  • ✅︎ Incremental batch
  • Обратите внимание, что поле expression для HWM может быть только именем поля, а не пользовательским выражением
  • ✅︎ hint (см. официальную документацию)
  • ✅︎ df_schema (обязательно)
  • ✅︎ options (см. MongoDB.ReadOptions)

Примеры

Стратегия Snapshot:

```python
    from onetl.connection import MongoDB
    from onetl.db import DBReader

    from pyspark.sql.types import (
        StructType,
        StructField,
        IntegerType,
        StringType,
        TimestampType,
    )

    mongodb = MongoDB(...)

    # обязательно
    df_schema = StructType(
        [
            StructField("_id", StringType()),
            StructField("some", StringType()),
            StructField(
                "field",
                StructType(
                    [
                        StructField("nested", IntegerType()),
                    ],
                ),
            ),
            StructField("updated_dt", TimestampType()),
        ]
    )

    reader = DBReader(
        connection=mongodb,
        source="some_collection",
        df_schema=df_schema,
        where={"field": {"$eq": 123}},
        hint={"field": 1},
        options=MongoDBReadOptions(batchSize=10000),
    )
    df = reader.run()
```

Стратегия Incremental:

```python
    from onetl.connection import MongoDB
    from onetl.db import DBReader
    from onetl.strategy import IncrementalStrategy

    from pyspark.sql.types import (
        StructType,
        StructField,
        IntegerType,
        StringType,
        TimestampType,
    )

    mongodb = MongoDB(...)

    # обязательно
    df_schema = StructType(
        [
            StructField("_id", StringType()),
            StructField("some", StringType()),
            StructField(
                "field",
                StructType(
                    [
                        StructField("nested", IntegerType()),
                    ],
                ),
            ),
            StructField("updated_dt", TimestampType()),
        ]
    )

    reader = DBReader(
        connection=mongodb,
        source="some_collection",
        df_schema=df_schema,
        where={"field": {"$eq": 123}},
        hint={"field": 1},
        hwm=DBReader.AutoDetectHWM(name="mongodb_hwm", expression="updated_dt"),
        options=MongoDBReadOptions(batchSize=10000),
    )

    with IncrementalStrategy():
        df = reader.run()
```

Рекомендации

Обратите внимание на значение where

Вместо фильтрации данных на стороне Spark с использованием df.filter(df.column == 'value') передавайте соответствующее условие DBReader(where={"column": {"$eq": "value"}}). Это уменьшает объем данных, передаваемых из MongoDB в Spark, и может повысить производительность запроса. Особенно если есть индексы для столбцов, используемых в условии where.

Параметры чтения

onetl.connection.db_connection.mongodb.options.MongoDBReadOptions

Bases: GenericOptions

Reading options for MongoDB connector.

.. warning::

Options ``uri``, ``database``, ``collection``, ``pipeline``, ``hint`` are populated from connection
attributes, and cannot be overridden by the user in ``ReadOptions`` to avoid issues.

.. versionadded:: 0.7.0

Examples

.. note ::

You can pass any value
`supported by connector <https://www.mongodb.com/docs/spark-connector/current/batch-mode/batch-read-config/>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

The set of supported options depends on connector version.

.. code:: python

from onetl.connection import MongoDB

options = MongoDB.ReadOptions(
    sampleSize=100,
)
Source code in onetl/connection/db_connection/mongodb/options.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class MongoDBReadOptions(GenericOptions):
    """Reading options for MongoDB connector.

    .. warning::

        Options ``uri``, ``database``, ``collection``, ``pipeline``, ``hint`` are populated from connection
        attributes, and cannot be overridden by the user in ``ReadOptions`` to avoid issues.

    .. versionadded:: 0.7.0

    Examples
    --------

    .. note ::

        You can pass any value
        `supported by connector <https://www.mongodb.com/docs/spark-connector/current/batch-mode/batch-read-config/>`_,
        even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

        The set of supported options depends on connector version.

    .. code:: python

        from onetl.connection import MongoDB

        options = MongoDB.ReadOptions(
            sampleSize=100,
        )
    """

    class Config:
        prohibited_options = PROHIBITED_OPTIONS
        known_options = KNOWN_READ_OPTIONS
        extra = "allow"