Skip to content

Логирование

Логирование очень важно для понимания того, что происходит под капотом onETL.

Уровень логирования по умолчанию для интерпретаторов Python - WARNING, но большинство логов onETL находятся на уровне INFO, поэтому пользователи обычно мало что видят.

Чтобы изменить уровень логирования, есть функция [setup_logging][onetl.log.setup_logging], которую следует вызывать в начале скрипта:

from onetl.log import setup_logging
from other.lib import some, more, imports

setup_logging()

# остальной код
...

Это изменяет как уровень логирования, так и формат логирования на что-то вроде этого:

Посмотреть логи
    2024-04-12 10:12:10,834 [INFO    ] MainThread: |onETL| Using IncrementalStrategy as a strategy
    2024-04-12 10:12:10,835 [INFO    ] MainThread: =================================== DBReader.run() starts ===================================
    2024-04-12 10:12:10,835 [INFO    ] MainThread: |DBReader| Getting Spark type for HWM expression: 'updated_at'
    2024-04-12 10:12:10,836 [INFO    ] MainThread: |MSSQL| Fetching schema of table 'source_schema.table' ...
    2024-04-12 10:12:11,636 [INFO    ] MainThread: |MSSQL| Schema fetched.
    2024-04-12 10:12:11,642 [INFO    ] MainThread: |DBReader| Got Spark field: StructField('updated_at', TimestampType(), True)
    2024-04-12 10:12:11,642 [INFO    ] MainThread: |DBReader| Detected HWM type: 'ColumnDateTimeHWM'
    2024-04-12 10:12:11,643 [INFO    ] MainThread: |IncrementalStrategy| Fetching HWM from HorizonHWMStore:
    2024-04-12 10:12:11,643 [INFO    ] MainThread:         name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb'
    2024-04-12 10:12:12,181 [INFO    ] MainThread: |IncrementalStrategy| Fetched HWM:
    2024-04-12 10:12:12,182 [INFO    ] MainThread:         hwm = ColumnDateTimeHWM(
    2024-04-12 10:12:12,182 [INFO    ] MainThread:             name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb',
    2024-04-12 10:12:12,182 [INFO    ] MainThread:             entity = 'source_schema.table',
    2024-04-12 10:12:12,182 [INFO    ] MainThread:             expression = 'updated_at',
    2024-04-12 10:12:12,184 [INFO    ] MainThread:             value = datetime.datetime(2024, 4, 11, 18, 10, 2, 120000),
    2024-04-12 10:12:12,184 [INFO    ] MainThread:         )
    2024-04-12 10:12:12,184 [INFO    ] MainThread: |MSSQL| -> |Spark| Reading DataFrame from source using parameters:
    2024-04-12 10:12:12,185 [INFO    ] MainThread:         source = 'source_schema.table'
    2024-04-12 10:12:12,185 [INFO    ] MainThread:         columns = [
    2024-04-12 10:12:12,185 [INFO    ] MainThread:             'id',
    2024-04-12 10:12:12,186 [INFO    ] MainThread:             'new_value',
    2024-04-12 10:12:12,186 [INFO    ] MainThread:             'old_value',
    2024-04-12 10:12:12,186 [INFO    ] MainThread:             'updated_at',
    2024-04-12 10:12:12,186 [INFO    ] MainThread:         ]
    2024-04-12 10:12:12,187 [INFO    ] MainThread:         where = "field = 'some'"
    2024-04-12 10:12:12,187 [INFO    ] MainThread:         hwm = AutoDetectHWM(
    2024-04-12 10:12:12,187 [INFO    ] MainThread:             name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb',
    2024-04-12 10:12:12,187 [INFO    ] MainThread:             entity = 'source_schema.table',
    2024-04-12 10:12:12,187 [INFO    ] MainThread:             expression = 'updated_at',
    2024-04-12 10:12:12,188 [INFO    ] MainThread:         )
    2024-04-12 10:12:12,188 [INFO    ] MainThread:         options = {
    2024-04-12 10:12:12,188 [INFO    ] MainThread:             'fetchsize': 100000,
    2024-04-12 10:12:12,188 [INFO    ] MainThread:             'numPartitions': 1,
    2024-04-12 10:12:12,189 [INFO    ] MainThread:             'partitioningMode': 'range',
    2024-04-12 10:12:12,189 [INFO    ] MainThread:         }
    2024-04-12 10:12:12,189 [INFO    ] MainThread: |MSSQL| Checking connection availability...
    2024-04-12 10:12:12,189 [INFO    ] MainThread: |MSSQL| Using connection parameters:
    2024-04-12 10:12:12,190 [INFO    ] MainThread:         user = 'db_user'
    2024-04-12 10:12:12,190 [INFO    ] MainThread:         password = SecretStr('**********')
    2024-04-12 10:12:12,190 [INFO    ] MainThread:         host = 'mssql.host'
    2024-04-12 10:12:12,190 [INFO    ] MainThread:         port = 1433
    2024-04-12 10:12:12,191 [INFO    ] MainThread:         database = 'somedb'
    2024-04-12 10:12:12,191 [INFO    ] MainThread:         extra = {'applicationIntent': 'ReadOnly', 'trustServerCertificate': 'true'}
    2024-04-12 10:12:12,191 [INFO    ] MainThread:         jdbc_url = 'jdbc:sqlserver:/mssql.host:1433'
    2024-04-12 10:12:12,579 [INFO    ] MainThread: |MSSQL| Connection is available.
    2024-04-12 10:12:12,581 [INFO    ] MainThread: |MSSQL| Executing SQL query (on driver):
    2024-04-12 10:12:12,581 [INFO    ] MainThread:         SELECT
    2024-04-12 10:12:12,581 [INFO    ] MainThread:                MIN(updated_at) AS "min",
    2024-04-12 10:12:12,582 [INFO    ] MainThread:                MAX(updated_at) AS "max"
    2024-04-12 10:12:12,582 [INFO    ] MainThread:         FROM
    2024-04-12 10:12:12,582 [INFO    ] MainThread:                source_schema.table
    2024-04-12 10:12:12,582 [INFO    ] MainThread:         WHERE
    2024-04-12 10:12:12,582 [INFO    ] MainThread:                (field = 'some')
    2024-04-12 10:12:12,583 [INFO    ] MainThread:           AND
    2024-04-12 10:12:12,583 [INFO    ] MainThread:                (updated_at >= CAST('2024-04-11T18:10:02.120000' AS datetime2))
    2024-04-12 10:16:22,537 [INFO    ] MainThread: |MSSQL| Received values:
    2024-04-12 10:16:22,538 [INFO    ] MainThread:         MIN(updated_at) = datetime.datetime(2024, 4, 11, 21, 10, 7, 397000)
    2024-04-12 10:16:22,538 [INFO    ] MainThread:         MAX(updated_at) = datetime.datetime(2024, 4, 12, 13, 12, 2, 123000)
    2024-04-12 10:16:22,540 [INFO    ] MainThread: |MSSQL| Executing SQL query (on executor):
    2024-04-12 10:16:22,540 [INFO    ] MainThread:         SELECT
    2024-04-12 10:16:22,540 [INFO    ] MainThread:                id,
    2024-04-12 10:16:22,541 [INFO    ] MainThread:                new_value,
    2024-04-12 10:16:22,541 [INFO    ] MainThread:                old_value,
    2024-04-12 10:16:22,541 [INFO    ] MainThread:                updated_at
    2024-04-12 10:16:22,541 [INFO    ] MainThread:         FROM
    2024-04-12 10:16:22,541 [INFO    ] MainThread:                source_schema.table
    2024-04-12 10:16:22,542 [INFO    ] MainThread:         WHERE
    2024-04-12 10:16:22,542 [INFO    ] MainThread:                (field = 'some')
    2024-04-12 10:16:22,542 [INFO    ] MainThread:           AND
    2024-04-12 10:16:22,542 [INFO    ] MainThread:                (updated_at >  CAST('2024-04-11T18:10:02.120000' AS datetime2))
    2024-04-12 10:16:22,542 [INFO    ] MainThread:           AND
    2024-04-12 10:16:22,542 [INFO    ] MainThread:                (updated_at <= CAST('2024-04-12T13:12:02.123000' AS datetime2))
    2024-04-12 10:16:22,892 [INFO    ] MainThread: |Spark| DataFrame successfully created from SQL statement
    2024-04-12 10:16:22,892 [INFO    ] MainThread: ------------------------------------ DBReader.run() ends ------------------------------------
    2024-04-12 10:40:42,409 [INFO    ] MainThread: =================================== DBWriter.run() starts ===================================
    2024-04-12 10:40:42,409 [INFO    ] MainThread: |Spark| -> |Hive| Writing DataFrame to target using parameters:
    2024-04-12 10:40:42,410 [INFO    ] MainThread:         target = 'target_source_schema.table'
    2024-04-12 10:40:42,410 [INFO    ] MainThread:         options = {
    2024-04-12 10:40:42,410 [INFO    ] MainThread:             'mode': 'append',
    2024-04-12 10:40:42,410 [INFO    ] MainThread:             'format': 'orc',
    2024-04-12 10:40:42,410 [INFO    ] MainThread:             'partitionBy': 'part_dt',
    2024-04-12 10:40:42,410 [INFO    ] MainThread:         }
    2024-04-12 10:40:42,411 [INFO    ] MainThread:         df_schema:
    2024-04-12 10:40:42,412 [INFO    ] MainThread:             root
    2024-04-12 10:40:42,412 [INFO    ] MainThread:              |-- id: integer (nullable = true)
    2024-04-12 10:40:42,413 [INFO    ] MainThread:              |-- new_value: string (nullable = true)
    2024-04-12 10:40:42,413 [INFO    ] MainThread:              |-- old_value: string (nullable = true)
    2024-04-12 10:40:42,413 [INFO    ] MainThread:              |-- updated_at: timestamp (nullable = true)
    2024-04-12 10:40:42,413 [INFO    ] MainThread:              |-- part_dt: date (nullable = true)
    2024-04-12 10:40:42,414 [INFO    ] MainThread:
    2024-04-12 10:40:42,421 [INFO    ] MainThread: |Hive| Checking connection availability...
    2024-04-12 10:40:42,421 [INFO    ] MainThread: |Hive| Using connection parameters:
    2024-04-12 10:40:42,421 [INFO    ] MainThread:         cluster = 'dwh'
    2024-04-12 10:40:42,475 [INFO    ] MainThread: |Hive| Connection is available.
    2024-04-12 10:40:42,476 [INFO    ] MainThread: |Hive| Fetching schema of table 'target_source_schema.table' ...
    2024-04-12 10:40:43,518 [INFO    ] MainThread: |Hive| Schema fetched.
    2024-04-12 10:40:43,521 [INFO    ] MainThread: |Hive| Table 'target_source_schema.table' already exists
    2024-04-12 10:40:43,521 [WARNING ] MainThread: |Hive| User-specified options {'partitionBy': 'part_dt'} are ignored while inserting into existing table. Using only table parameters from Hive metastore
    2024-04-12 10:40:43,782 [INFO    ] MainThread: |Hive| Inserting data into existing table 'target_source_schema.table' ...
    2024-04-12 11:06:07,396 [INFO    ] MainThread: |Hive| Data is successfully inserted into table 'target_source_schema.table'.
    2024-04-12 11:06:07,397 [INFO    ] MainThread: ------------------------------------ DBWriter.run() ends ------------------------------------
    2024-04-12 11:06:07,397 [INFO    ] MainThread: |onETL| Exiting IncrementalStrategy
    2024-04-12 11:06:07,397 [INFO    ] MainThread: |IncrementalStrategy| Saving HWM to 'HorizonHWMStore':
    2024-04-12 11:06:07,397 [INFO    ] MainThread:         hwm = ColumnDateTimeHWM(
    2024-04-12 11:06:07,397 [INFO    ] MainThread:             name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb',
    2024-04-12 11:06:07,397 [INFO    ] MainThread:             entity = 'source_source_schema.table',
    2024-04-12 11:06:07,397 [INFO    ] MainThread:             expression = 'updated_at',
    2024-04-12 11:06:07,397 [INFO    ] MainThread:             value = datetime.datetime(2024, 4, 12, 13, 12, 2, 123000),
    2024-04-12 11:06:07,495 [INFO    ] MainThread: |IncrementalStrategy| HWM has been saved

Каждый шаг, выполняемый onETL, тщательно логируется, что должно помочь с отладкой.

Вы можете сделать логи еще более подробными, изменив уровень на DEBUG:

from onetl.log import setup_logging

setup_logging(level="DEBUG", enable_clients=True)

# остальной код
...

Это также изменяет уровень логирования для всех базовых библиотек Python, например, показывая каждый выполняемый HTTP-запрос и т. д.

::: onetl.log options: members: - setup_logging - setup_clients_logging - set_default_logging_format