Skip to content

Логирование

Логирование очень важно для понимания того, что происходит под капотом onETL.

Уровень логирования по умолчанию для интерпретаторов Python - WARNING, но большинство логов onETL находятся на уровне INFO, поэтому пользователи обычно мало что видят.

Чтобы изменить уровень логирования, есть функция setup_logging, которую следует вызывать в начале скрипта:

from onetl.log import setup_logging
from other.lib import some, more, imports

setup_logging()

# остальной код
...

Это изменяет как уровень логирования, так и формат логирования на что-то вроде этого:

Посмотреть логи
    2024-04-12 10:12:10,834 [INFO    ] MainThread: |onETL| Using IncrementalStrategy as a strategy
    2024-04-12 10:12:10,835 [INFO    ] MainThread: =================================== DBReader.run() starts ===================================
    2024-04-12 10:12:10,835 [INFO    ] MainThread: |DBReader| Getting Spark type for HWM expression: 'updated_at'
    2024-04-12 10:12:10,836 [INFO    ] MainThread: |MSSQL| Fetching schema of table 'source_schema.table' ...
    2024-04-12 10:12:11,636 [INFO    ] MainThread: |MSSQL| Schema fetched.
    2024-04-12 10:12:11,642 [INFO    ] MainThread: |DBReader| Got Spark field: StructField('updated_at', TimestampType(), True)
    2024-04-12 10:12:11,642 [INFO    ] MainThread: |DBReader| Detected HWM type: 'ColumnDateTimeHWM'
    2024-04-12 10:12:11,643 [INFO    ] MainThread: |IncrementalStrategy| Fetching HWM from HorizonHWMStore:
    2024-04-12 10:12:11,643 [INFO    ] MainThread:         name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb'
    2024-04-12 10:12:12,181 [INFO    ] MainThread: |IncrementalStrategy| Fetched HWM:
    2024-04-12 10:12:12,182 [INFO    ] MainThread:         hwm = ColumnDateTimeHWM(
    2024-04-12 10:12:12,182 [INFO    ] MainThread:             name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb',
    2024-04-12 10:12:12,182 [INFO    ] MainThread:             entity = 'source_schema.table',
    2024-04-12 10:12:12,182 [INFO    ] MainThread:             expression = 'updated_at',
    2024-04-12 10:12:12,184 [INFO    ] MainThread:             value = datetime.datetime(2024, 4, 11, 18, 10, 2, 120000),
    2024-04-12 10:12:12,184 [INFO    ] MainThread:         )
    2024-04-12 10:12:12,184 [INFO    ] MainThread: |MSSQL| -> |Spark| Reading DataFrame from source using parameters:
    2024-04-12 10:12:12,185 [INFO    ] MainThread:         source = 'source_schema.table'
    2024-04-12 10:12:12,185 [INFO    ] MainThread:         columns = [
    2024-04-12 10:12:12,185 [INFO    ] MainThread:             'id',
    2024-04-12 10:12:12,186 [INFO    ] MainThread:             'new_value',
    2024-04-12 10:12:12,186 [INFO    ] MainThread:             'old_value',
    2024-04-12 10:12:12,186 [INFO    ] MainThread:             'updated_at',
    2024-04-12 10:12:12,186 [INFO    ] MainThread:         ]
    2024-04-12 10:12:12,187 [INFO    ] MainThread:         where = "field = 'some'"
    2024-04-12 10:12:12,187 [INFO    ] MainThread:         hwm = AutoDetectHWM(
    2024-04-12 10:12:12,187 [INFO    ] MainThread:             name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb',
    2024-04-12 10:12:12,187 [INFO    ] MainThread:             entity = 'source_schema.table',
    2024-04-12 10:12:12,187 [INFO    ] MainThread:             expression = 'updated_at',
    2024-04-12 10:12:12,188 [INFO    ] MainThread:         )
    2024-04-12 10:12:12,188 [INFO    ] MainThread:         options = {
    2024-04-12 10:12:12,188 [INFO    ] MainThread:             'fetchsize': 100000,
    2024-04-12 10:12:12,188 [INFO    ] MainThread:             'numPartitions': 1,
    2024-04-12 10:12:12,189 [INFO    ] MainThread:             'partitioningMode': 'range',
    2024-04-12 10:12:12,189 [INFO    ] MainThread:         }
    2024-04-12 10:12:12,189 [INFO    ] MainThread: |MSSQL| Checking connection availability...
    2024-04-12 10:12:12,189 [INFO    ] MainThread: |MSSQL| Using connection parameters:
    2024-04-12 10:12:12,190 [INFO    ] MainThread:         user = 'db_user'
    2024-04-12 10:12:12,190 [INFO    ] MainThread:         password = SecretStr('**********')
    2024-04-12 10:12:12,190 [INFO    ] MainThread:         host = 'mssql.host'
    2024-04-12 10:12:12,190 [INFO    ] MainThread:         port = 1433
    2024-04-12 10:12:12,191 [INFO    ] MainThread:         database = 'somedb'
    2024-04-12 10:12:12,191 [INFO    ] MainThread:         extra = {'applicationIntent': 'ReadOnly', 'trustServerCertificate': 'true'}
    2024-04-12 10:12:12,191 [INFO    ] MainThread:         jdbc_url = 'jdbc:sqlserver:/mssql.host:1433'
    2024-04-12 10:12:12,579 [INFO    ] MainThread: |MSSQL| Connection is available.
    2024-04-12 10:12:12,581 [INFO    ] MainThread: |MSSQL| Executing SQL query (on driver):
    2024-04-12 10:12:12,581 [INFO    ] MainThread:         SELECT
    2024-04-12 10:12:12,581 [INFO    ] MainThread:                MIN(updated_at) AS "min",
    2024-04-12 10:12:12,582 [INFO    ] MainThread:                MAX(updated_at) AS "max"
    2024-04-12 10:12:12,582 [INFO    ] MainThread:         FROM
    2024-04-12 10:12:12,582 [INFO    ] MainThread:                source_schema.table
    2024-04-12 10:12:12,582 [INFO    ] MainThread:         WHERE
    2024-04-12 10:12:12,582 [INFO    ] MainThread:                (field = 'some')
    2024-04-12 10:12:12,583 [INFO    ] MainThread:           AND
    2024-04-12 10:12:12,583 [INFO    ] MainThread:                (updated_at >= CAST('2024-04-11T18:10:02.120000' AS datetime2))
    2024-04-12 10:16:22,537 [INFO    ] MainThread: |MSSQL| Received values:
    2024-04-12 10:16:22,538 [INFO    ] MainThread:         MIN(updated_at) = datetime.datetime(2024, 4, 11, 21, 10, 7, 397000)
    2024-04-12 10:16:22,538 [INFO    ] MainThread:         MAX(updated_at) = datetime.datetime(2024, 4, 12, 13, 12, 2, 123000)
    2024-04-12 10:16:22,540 [INFO    ] MainThread: |MSSQL| Executing SQL query (on executor):
    2024-04-12 10:16:22,540 [INFO    ] MainThread:         SELECT
    2024-04-12 10:16:22,540 [INFO    ] MainThread:                id,
    2024-04-12 10:16:22,541 [INFO    ] MainThread:                new_value,
    2024-04-12 10:16:22,541 [INFO    ] MainThread:                old_value,
    2024-04-12 10:16:22,541 [INFO    ] MainThread:                updated_at
    2024-04-12 10:16:22,541 [INFO    ] MainThread:         FROM
    2024-04-12 10:16:22,541 [INFO    ] MainThread:                source_schema.table
    2024-04-12 10:16:22,542 [INFO    ] MainThread:         WHERE
    2024-04-12 10:16:22,542 [INFO    ] MainThread:                (field = 'some')
    2024-04-12 10:16:22,542 [INFO    ] MainThread:           AND
    2024-04-12 10:16:22,542 [INFO    ] MainThread:                (updated_at >  CAST('2024-04-11T18:10:02.120000' AS datetime2))
    2024-04-12 10:16:22,542 [INFO    ] MainThread:           AND
    2024-04-12 10:16:22,542 [INFO    ] MainThread:                (updated_at <= CAST('2024-04-12T13:12:02.123000' AS datetime2))
    2024-04-12 10:16:22,892 [INFO    ] MainThread: |Spark| DataFrame successfully created from SQL statement
    2024-04-12 10:16:22,892 [INFO    ] MainThread: ------------------------------------ DBReader.run() ends ------------------------------------
    2024-04-12 10:40:42,409 [INFO    ] MainThread: =================================== DBWriter.run() starts ===================================
    2024-04-12 10:40:42,409 [INFO    ] MainThread: |Spark| -> |Hive| Writing DataFrame to target using parameters:
    2024-04-12 10:40:42,410 [INFO    ] MainThread:         target = 'target_source_schema.table'
    2024-04-12 10:40:42,410 [INFO    ] MainThread:         options = {
    2024-04-12 10:40:42,410 [INFO    ] MainThread:             'mode': 'append',
    2024-04-12 10:40:42,410 [INFO    ] MainThread:             'format': 'orc',
    2024-04-12 10:40:42,410 [INFO    ] MainThread:             'partitionBy': 'part_dt',
    2024-04-12 10:40:42,410 [INFO    ] MainThread:         }
    2024-04-12 10:40:42,411 [INFO    ] MainThread:         df_schema:
    2024-04-12 10:40:42,412 [INFO    ] MainThread:             root
    2024-04-12 10:40:42,412 [INFO    ] MainThread:              |-- id: integer (nullable = true)
    2024-04-12 10:40:42,413 [INFO    ] MainThread:              |-- new_value: string (nullable = true)
    2024-04-12 10:40:42,413 [INFO    ] MainThread:              |-- old_value: string (nullable = true)
    2024-04-12 10:40:42,413 [INFO    ] MainThread:              |-- updated_at: timestamp (nullable = true)
    2024-04-12 10:40:42,413 [INFO    ] MainThread:              |-- part_dt: date (nullable = true)
    2024-04-12 10:40:42,414 [INFO    ] MainThread:
    2024-04-12 10:40:42,421 [INFO    ] MainThread: |Hive| Checking connection availability...
    2024-04-12 10:40:42,421 [INFO    ] MainThread: |Hive| Using connection parameters:
    2024-04-12 10:40:42,421 [INFO    ] MainThread:         cluster = 'dwh'
    2024-04-12 10:40:42,475 [INFO    ] MainThread: |Hive| Connection is available.
    2024-04-12 10:40:42,476 [INFO    ] MainThread: |Hive| Fetching schema of table 'target_source_schema.table' ...
    2024-04-12 10:40:43,518 [INFO    ] MainThread: |Hive| Schema fetched.
    2024-04-12 10:40:43,521 [INFO    ] MainThread: |Hive| Table 'target_source_schema.table' already exists
    2024-04-12 10:40:43,521 [WARNING ] MainThread: |Hive| User-specified options {'partitionBy': 'part_dt'} are ignored while inserting into existing table. Using only table parameters from Hive metastore
    2024-04-12 10:40:43,782 [INFO    ] MainThread: |Hive| Inserting data into existing table 'target_source_schema.table' ...
    2024-04-12 11:06:07,396 [INFO    ] MainThread: |Hive| Data is successfully inserted into table 'target_source_schema.table'.
    2024-04-12 11:06:07,397 [INFO    ] MainThread: ------------------------------------ DBWriter.run() ends ------------------------------------
    2024-04-12 11:06:07,397 [INFO    ] MainThread: |onETL| Exiting IncrementalStrategy
    2024-04-12 11:06:07,397 [INFO    ] MainThread: |IncrementalStrategy| Saving HWM to 'HorizonHWMStore':
    2024-04-12 11:06:07,397 [INFO    ] MainThread:         hwm = ColumnDateTimeHWM(
    2024-04-12 11:06:07,397 [INFO    ] MainThread:             name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb',
    2024-04-12 11:06:07,397 [INFO    ] MainThread:             entity = 'source_source_schema.table',
    2024-04-12 11:06:07,397 [INFO    ] MainThread:             expression = 'updated_at',
    2024-04-12 11:06:07,397 [INFO    ] MainThread:             value = datetime.datetime(2024, 4, 12, 13, 12, 2, 123000),
    2024-04-12 11:06:07,495 [INFO    ] MainThread: |IncrementalStrategy| HWM has been saved

Каждый шаг, выполняемый onETL, тщательно логируется, что должно помочь с отладкой.

Вы можете сделать логи еще более подробными, изменив уровень на DEBUG:

from onetl.log import setup_logging

setup_logging(level="DEBUG", enable_clients=True)

# остальной код
...

Это также изменяет уровень логирования для всех базовых библиотек Python, например, показывая каждый выполняемый HTTP-запрос и т. д.

setup_logging(level=logging.INFO, *, enable_clients=False)

Set up onETL logging.

What this function does
  • Adds stderr logging handler
  • Changes root logger format to 2023-05-31 11:22:33.456 [INFO] MainThread: message
  • Changes root logger level to level
  • Changes onETL logger level to level
  • Sets up logging level of underlying client modules

.. note::

Should be used only in IDEs (like Jupyter notebooks or PyCharm),
or scripts (ETL pipelines).

.. versionchanged:: 0.5.0 Renamed setup_notebook_loggingsetup_logging

Parameters

level : int or str, default INFO Log level for onETL module

bool, default False

If True, enable logging of underlying client modules. Otherwise, set client modules log level to DISABLED.

.. note::

For ``level="DEBUG"`` it is recommended to use ``enable_clients=True``

.. versionadded:: 0.9.0

Source code in onetl/log.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def setup_logging(level: int | str = logging.INFO, *, enable_clients: bool = False) -> None:
    """Set up onETL logging.

    What this function does:
        * Adds stderr logging handler
        * Changes root logger format to ``2023-05-31 11:22:33.456 [INFO] MainThread: message``
        * Changes root logger level to ``level``
        * Changes onETL logger level to ``level``
        * Sets up logging level of underlying client modules

    .. note::

        Should be used only in IDEs (like Jupyter notebooks or PyCharm),
        or scripts (ETL pipelines).

    .. versionchanged:: 0.5.0
        Renamed ``setup_notebook_logging`` → ``setup_logging``

    Parameters
    ----------
    level : ``int`` or ``str``, default ``INFO``
        Log level for onETL module

    enable_clients : ``bool``, default ``False``
        If ``True``, enable logging of underlying client modules.
        Otherwise, set client modules log level to ``DISABLED``.

        .. note::

            For ``level="DEBUG"`` it is recommended to use ``enable_clients=True``

        .. versionadded:: 0.9.0
    """

    logging.basicConfig(level=level)
    set_default_logging_format()

    onetl_log.setLevel(level)
    setup_clients_logging(level if enable_clients else DISABLED)

setup_clients_logging(level=DISABLED)

Set logging of underlying client modules used by onETL.

Affected modules
  • ftputil
  • hdfs
  • minio
  • paramiko
  • py4j
  • pyspark
  • webdav3

.. note::

Can be used in applications, but it is recommended to set up these loggers
according to your framework documentation.

.. versionchanged:: 0.9.0 Renamed disable_clients_loggingsetup_clients_logging

Parameters

level : int or str, default DISABLED Log level for client modules

.. note::

    For ``py4j``, logging level with maximum verbosity is ``INFO`` because ``DEBUG`` logs are
    totally unreadable.

.. versionadded:: 0.9.0
Source code in onetl/log.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def setup_clients_logging(level: int | str = DISABLED) -> None:
    """Set logging of underlying client modules used by onETL.

    Affected modules:
        * ``ftputil``
        * ``hdfs``
        * ``minio``
        * ``paramiko``
        * ``py4j``
        * ``pyspark``
        * ``webdav3``

    .. note::

        Can be used in applications, but it is recommended to set up these loggers
        according to your framework documentation.

    .. versionchanged:: 0.9.0
        Renamed ``disable_clients_logging`` → ``setup_clients_logging``

    Parameters
    ----------
    level : ``int`` or ``str``, default ``DISABLED``
        Log level for client modules

        .. note::

            For ``py4j``, logging level with maximum verbosity is ``INFO`` because ``DEBUG`` logs are
            totally unreadable.

        .. versionadded:: 0.9.0
    """

    for client_module in CLIENT_MODULES:
        logging.getLogger(client_module).setLevel(level)

    if isinstance(level, str):
        level = int(logging.getLevelName(level))
    logging.getLogger("py4j").setLevel(max(level, logging.INFO))

set_default_logging_format()

Sets default logging format to preferred by onETL.

Example log message: 2023-05-31 11:22:33.456 [INFO] MainThread: message

.. note::

Should be used only in IDEs (like Jupyter notebooks or PyCharm),
or scripts (ETL pipelines).

.. warning::

Should **NOT** be used in applications, you should set up logging settings manually,
according to your framework documentation.
Source code in onetl/log.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def set_default_logging_format() -> None:
    """Sets default logging format to preferred by onETL.

    Example log message: ``2023-05-31 11:22:33.456 [INFO] MainThread: message``

    .. note::

        Should be used only in IDEs (like Jupyter notebooks or PyCharm),
        or scripts (ETL pipelines).

    .. warning::

        Should **NOT** be used in applications, you should set up logging settings manually,
        according to your framework documentation.
    """

    handlers = onetl_log.handlers or root_log.handlers
    for handler in handlers:
        handler.setFormatter(logging.Formatter(LOG_FORMAT))