Skip to content

Чтение из MongoDB с использованием MongoDB.pipeline

MongoDB.sql позволяет передавать пользовательский пайплайн, но не поддерживает инкрементальные стратегии.

Предупреждение

Пожалуйста, учитывайте типы данных MongoDB

Рекомендации

Обратите внимание на значение pipeline

Вместо фильтрации данных на стороне Spark с помощью df.filter(df.column == 'value') передавайте правильное значение mongodb.pipeline(..., pipeline={"$match": {"column": {"$eq": "value"}}}). Это одновременно уменьшает объем данных, передаваемых из MongoDB в Spark, и может также улучшить производительность запроса. Особенно если есть индексы для столбцов, используемых в значении pipeline.

References

onetl.connection.db_connection.mongodb.connection.MongoDB.pipeline(collection, pipeline=None, df_schema=None, options=None)

Execute a pipeline for a specific collection, and return DataFrame. support hooks

Almost like Aggregation pipeline syntax in MongoDB:

db.collection_name.aggregate([{"$match": ...}, {"$group": ...}])
but pipeline is executed on Spark executors, in a distributed way.

Note

This method does not support strategy, use DBReader instead

Added in 0.7.0

Parameters
str

Collection name.

dict | list[dict], optional

Pipeline containing a database query. See Aggregation pipeline syntax.

StructType, optional

Schema describing the resulting DataFrame.

PipelineOptions | dict, optional

Additional pipeline options, see MongoDB.PipelineOptions.

Examples

Get document with a specific field value:

df = connection.pipeline(
    collection="collection_name",
    pipeline={"$match": {"field": {"$eq": 1}}},
)
Calculate aggregation and get result:

df = connection.pipeline(
    collection="collection_name",
    pipeline={
        "$group": {
            "_id": 1,
            "min": {"$min": "$column_int"},
            "max": {"$max": "$column_int"},
        }
    },
)
Explicitly pass DataFrame schema:

from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
    TimestampType,
)

df_schema = StructType(
    [
        StructField("_id", StringType()),
        StructField("some_string", StringType()),
        StructField("some_int", IntegerType()),
        StructField("some_datetime", TimestampType()),
        StructField("some_float", DoubleType()),
    ],
)

df = connection.pipeline(
    collection="collection_name",
    df_schema=df_schema,
    pipeline={"$match": {"some_int": {"$gt": 999}}},
)
Pass additional options to pipeline execution:

df = connection.pipeline(
    collection="collection_name",
    pipeline={"$match": {"field": {"$eq": 1}}},
    options=MongoDB.PipelineOptions(hint={"field": 1}),
)
Source code in onetl/connection/db_connection/mongodb/connection.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
@slot
def pipeline(
    self,
    collection: str,
    pipeline: dict | list[dict] | None = None,
    df_schema: StructType | None = None,
    options: MongoDBPipelineOptions | dict | None = None,
):
    """
    Execute a pipeline for a specific collection, and return DataFrame. [![support hooks](https://img.shields.io/badge/%20-support%20hooks-blue)](/hooks/)

    Almost like [Aggregation pipeline syntax](https://www.mongodb.com/docs/manual/core/aggregation-pipeline/)
    in MongoDB:

    ```js
    db.collection_name.aggregate([{"$match": ...}, {"$group": ...}])
    ```
    but pipeline is executed on Spark executors, in a distributed way.

    !!! note

        This method does not support [strategy][],
        use [DBReader][onetl.db.db_reader.db_reader.DBReader] instead

    !!! success "Added in 0.7.0"

    Parameters
    ----------

    collection : str
        Collection name.

    pipeline : dict | list[dict], optional
        Pipeline containing a database query.
        See [Aggregation pipeline syntax](https://www.mongodb.com/docs/manual/core/aggregation-pipeline/).

    df_schema : StructType, optional
        Schema describing the resulting DataFrame.

    options : PipelineOptions | dict, optional
        Additional pipeline options,
        see [MongoDB.PipelineOptions][onetl.connection.db_connection.mongodb.options.MongoDBPipelineOptions].

    Examples
    --------

    Get document with a specific `field` value:

    ```python
    df = connection.pipeline(
        collection="collection_name",
        pipeline={"$match": {"field": {"$eq": 1}}},
    )
    ```
    Calculate aggregation and get result:

    ```python
    df = connection.pipeline(
        collection="collection_name",
        pipeline={
            "$group": {
                "_id": 1,
                "min": {"$min": "$column_int"},
                "max": {"$max": "$column_int"},
            }
        },
    )
    ```
    Explicitly pass DataFrame schema:

    ```python
    from pyspark.sql.types import (
        DoubleType,
        IntegerType,
        StringType,
        StructField,
        StructType,
        TimestampType,
    )

    df_schema = StructType(
        [
            StructField("_id", StringType()),
            StructField("some_string", StringType()),
            StructField("some_int", IntegerType()),
            StructField("some_datetime", TimestampType()),
            StructField("some_float", DoubleType()),
        ],
    )

    df = connection.pipeline(
        collection="collection_name",
        df_schema=df_schema,
        pipeline={"$match": {"some_int": {"$gt": 999}}},
    )
    ```
    Pass additional options to pipeline execution:

    ```python
    df = connection.pipeline(
        collection="collection_name",
        pipeline={"$match": {"field": {"$eq": 1}}},
        options=MongoDB.PipelineOptions(hint={"field": 1}),
    )
    ```
    """
    log.info("|%s| Executing aggregation pipeline:", self.__class__.__name__)

    read_options = self.PipelineOptions.parse(options).dict(by_alias=True, exclude_none=True)
    if pipeline:
        pipeline = self.dialect.prepare_pipeline(pipeline)

    log_with_indent(log, "collection = %r", collection)
    log_json(log, pipeline, name="pipeline")

    if df_schema:
        empty_df = self.spark.createDataFrame([], df_schema)
        log_dataframe_schema(log, empty_df)

    log_options(log, read_options)

    # exclude from the log
    read_options.update(self._get_connection_params(collection))
    if pipeline:
        read_options["aggregation.pipeline"] = json.dumps(pipeline)

    with override_job_description(self.spark, f"{self}.pipeline()"):
        spark_reader = self.spark.read.format("mongodb").options(**read_options)

        if df_schema:
            spark_reader = spark_reader.schema(df_schema)

        return spark_reader.load()

onetl.connection.db_connection.mongodb.options.MongoDBPipelineOptions

Bases: GenericOptions

Aggregation pipeline options for MongoDB connector.

The only difference from [MongoDB.ReadOptions][MongoDBReadOptions] that latter does not allow to pass the hint parameter.

Warning

Options uri, database, collection, pipeline are populated from connection attributes, and cannot be overridden by the user in PipelineOptions to avoid issues.

Added in 0.7.0

Examples

Note

You can pass any value supported by connector, even if it is not mentioned in this documentation. Option names should be in camelCase!

The set of supported options depends on connector version.

from onetl.connection import MongoDB

options = MongoDB.PipelineOptions(
    hint={"some_field": 1},
)
Source code in onetl/connection/db_connection/mongodb/options.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class MongoDBPipelineOptions(GenericOptions):
    """Aggregation pipeline options for MongoDB connector.

    The only difference from [MongoDB.ReadOptions][MongoDBReadOptions]
    that latter does not allow to pass the `hint` parameter.

    !!! warning

        Options `uri`, `database`, `collection`, `pipeline` are populated from connection attributes,
        and cannot be overridden by the user in `PipelineOptions` to avoid issues.

    !!! success "Added in 0.7.0"

    Examples
    --------

    !!! note

        You can pass any value
        [supported by connector](https://www.mongodb.com/docs/spark-connector/current/batch-mode/batch-read-config/),
        even if it is not mentioned in this documentation. **Option names should be in** `camelCase`!

        The set of supported options depends on connector version.

    ```python
    from onetl.connection import MongoDB

    options = MongoDB.PipelineOptions(
        hint={"some_field": 1},
    )
    ```
    """

    class Config:
        prohibited_options = PIPELINE_PROHIBITED_OPTIONS
        known_options = KNOWN_READ_OPTIONS
        extra = "allow"