Чтение из MongoDB с использованием DBReader
DBReader поддерживает стратегии для инкрементального чтения данных, но не поддерживает пользовательские пайплайны, например, агрегацию.
Warning
Пожалуйста, учитывайте типы данных MongoDB
Поддерживаемые функции DBReader
- ❌
columns(на данный момент читаются все поля документа) - ✅︎
where(передается в пайплайн агрегации{"$match": ...}) - ✅︎
hwm, поддерживаемые стратегии: - ✅︎ Snapshot
- ✅︎ Incremental
- ✅︎ Snapshot batch
- ✅︎ Incremental batch
- Обратите внимание, что поле
expressionдля HWM может быть только именем поля, а не пользовательским выражением - ✅︎
hint(см. официальную документацию) - ✅︎
df_schema(обязательно) - ✅︎
options(см. MongoDB.ReadOptions)
Примеры
Стратегия Snapshot:
```python
from onetl.connection import MongoDB
from onetl.db import DBReader
from pyspark.sql.types import (
StructType,
StructField,
IntegerType,
StringType,
TimestampType,
)
mongodb = MongoDB(...)
# обязательно
df_schema = StructType(
[
StructField("_id", StringType()),
StructField("some", StringType()),
StructField(
"field",
StructType(
[
StructField("nested", IntegerType()),
],
),
),
StructField("updated_dt", TimestampType()),
]
)
reader = DBReader(
connection=mongodb,
source="some_collection",
df_schema=df_schema,
where={"field": {"$eq": 123}},
hint={"field": 1},
options=MongoDBReadOptions(batchSize=10000),
)
df = reader.run()
```
Стратегия Incremental:
```python
from onetl.connection import MongoDB
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy
from pyspark.sql.types import (
StructType,
StructField,
IntegerType,
StringType,
TimestampType,
)
mongodb = MongoDB(...)
# обязательно
df_schema = StructType(
[
StructField("_id", StringType()),
StructField("some", StringType()),
StructField(
"field",
StructType(
[
StructField("nested", IntegerType()),
],
),
),
StructField("updated_dt", TimestampType()),
]
)
reader = DBReader(
connection=mongodb,
source="some_collection",
df_schema=df_schema,
where={"field": {"$eq": 123}},
hint={"field": 1},
hwm=DBReader.AutoDetectHWM(name="mongodb_hwm", expression="updated_dt"),
options=MongoDBReadOptions(batchSize=10000),
)
with IncrementalStrategy():
df = reader.run()
```
Рекомендации
Обратите внимание на значение where
Вместо фильтрации данных на стороне Spark с использованием df.filter(df.column == 'value') передавайте соответствующее условие DBReader(where={"column": {"$eq": "value"}}).
Это уменьшает объем данных, передаваемых из MongoDB в Spark, и может повысить производительность запроса.
Особенно если есть индексы для столбцов, используемых в условии where.
Параметры чтения
onetl.connection.db_connection.mongodb.options.MongoDBReadOptions
Bases: GenericOptions
Reading options for MongoDB connector.
.. warning::
Options ``uri``, ``database``, ``collection``, ``pipeline``, ``hint`` are populated from connection
attributes, and cannot be overridden by the user in ``ReadOptions`` to avoid issues.
.. versionadded:: 0.7.0
Examples
.. note ::
You can pass any value
`supported by connector <https://www.mongodb.com/docs/spark-connector/current/batch-mode/batch-read-config/>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!
The set of supported options depends on connector version.
.. code:: python
from onetl.connection import MongoDB
options = MongoDB.ReadOptions(
sampleSize=100,
)
Source code in onetl/connection/db_connection/mongodb/options.py
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | |