Skip to content

Чтение из MySQL с использованием DBReader

DBReader поддерживает стратегии для инкрементального чтения данных, но не поддерживает нестандартные запросы, такие как JOIN.

Warning

Обратите внимание на типы данных MySQL

Поддерживаемые функции DBReader

Примеры

Стратегия Snapshot:

```python
    from onetl.connection import MySQL
    from onetl.db import DBReader

    mysql = MySQL(...)

    reader = DBReader(
        connection=mysql,
        source="schema.table",
        columns=["id", "key", "CAST(value AS text) value", "updated_dt"],
        where="key = 'something'",
        hint="SKIP_SCAN(schema.table key_index)",
        options=MySQL.ReadOptions(partitionColumn="id", numPartitions=10),
    )
    df = reader.run()   
```

Стратегия Incremental:

```python
    from onetl.connection import MySQL
    from onetl.db import DBReader
    from onetl.strategy import IncrementalStrategy

    mysql = MySQL(...)

    reader = DBReader(
        connection=mysql,
        source="schema.table",
        columns=["id", "key", "CAST(value AS text) value", "updated_dt"],
        where="key = 'something'",
        hint="SKIP_SCAN(schema.table key_index)",
        hwm=DBReader.AutoDetectHWM(name="mysql_hwm", expression="updated_dt"),
        options=MySQL.ReadOptions(partitionColumn="id", numPartitions=10),
    )

    with IncrementalStrategy():
        df = reader.run()  
```

Рекомендации

Выбирайте только необходимые столбцы

Вместо передачи "*" в DBReader(columns=[...]) предпочтительно передавать точные имена столбцов. Это уменьшает объем данных, передаваемых из Oracle в Spark.

Обратите внимание на значение where

Вместо фильтрации данных на стороне Spark с использованием df.filter(df.column == 'value'), передайте соответствующее выражение DBReader(where="column = 'value'"). Это не только уменьшает объем передаваемых данных из Oracle в Spark, но и может улучшить производительность запроса. Особенно если для столбцов, используемых в условии where, существуют индексы.

Опции

onetl.connection.db_connection.mysql.options.MySQLReadOptions

Bases: JDBCReadOptions

Source code in onetl/connection/db_connection/mysql/options.py
16
17
class MySQLReadOptions(JDBCReadOptions):
    __doc__ = JDBCReadOptions.__doc__.replace("SomeDB", "MySQL")  # type: ignore[assignment, union-attr]

__doc__ = JDBCReadOptions.__doc__.replace('SomeDB', 'MySQL') class-attribute instance-attribute