Логирование
Логирование очень важно для понимания того, что происходит под капотом onETL.
Уровень логирования по умолчанию для интерпретаторов Python - WARNING, но большинство логов onETL находятся на уровне INFO, поэтому пользователи обычно мало что видят.
Чтобы изменить уровень логирования, есть функция [setup_logging][onetl.log.setup_logging], которую следует вызывать в начале скрипта:
from onetl.log import setup_logging
from other.lib import some, more, imports
setup_logging()
# остальной код
...
Это изменяет как уровень логирования, так и формат логирования на что-то вроде этого:
Посмотреть логи
2024-04-12 10:12:10,834 [INFO ] MainThread: |onETL| Using IncrementalStrategy as a strategy
2024-04-12 10:12:10,835 [INFO ] MainThread: =================================== DBReader.run() starts ===================================
2024-04-12 10:12:10,835 [INFO ] MainThread: |DBReader| Getting Spark type for HWM expression: 'updated_at'
2024-04-12 10:12:10,836 [INFO ] MainThread: |MSSQL| Fetching schema of table 'source_schema.table' ...
2024-04-12 10:12:11,636 [INFO ] MainThread: |MSSQL| Schema fetched.
2024-04-12 10:12:11,642 [INFO ] MainThread: |DBReader| Got Spark field: StructField('updated_at', TimestampType(), True)
2024-04-12 10:12:11,642 [INFO ] MainThread: |DBReader| Detected HWM type: 'ColumnDateTimeHWM'
2024-04-12 10:12:11,643 [INFO ] MainThread: |IncrementalStrategy| Fetching HWM from HorizonHWMStore:
2024-04-12 10:12:11,643 [INFO ] MainThread: name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb'
2024-04-12 10:12:12,181 [INFO ] MainThread: |IncrementalStrategy| Fetched HWM:
2024-04-12 10:12:12,182 [INFO ] MainThread: hwm = ColumnDateTimeHWM(
2024-04-12 10:12:12,182 [INFO ] MainThread: name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb',
2024-04-12 10:12:12,182 [INFO ] MainThread: entity = 'source_schema.table',
2024-04-12 10:12:12,182 [INFO ] MainThread: expression = 'updated_at',
2024-04-12 10:12:12,184 [INFO ] MainThread: value = datetime.datetime(2024, 4, 11, 18, 10, 2, 120000),
2024-04-12 10:12:12,184 [INFO ] MainThread: )
2024-04-12 10:12:12,184 [INFO ] MainThread: |MSSQL| -> |Spark| Reading DataFrame from source using parameters:
2024-04-12 10:12:12,185 [INFO ] MainThread: source = 'source_schema.table'
2024-04-12 10:12:12,185 [INFO ] MainThread: columns = [
2024-04-12 10:12:12,185 [INFO ] MainThread: 'id',
2024-04-12 10:12:12,186 [INFO ] MainThread: 'new_value',
2024-04-12 10:12:12,186 [INFO ] MainThread: 'old_value',
2024-04-12 10:12:12,186 [INFO ] MainThread: 'updated_at',
2024-04-12 10:12:12,186 [INFO ] MainThread: ]
2024-04-12 10:12:12,187 [INFO ] MainThread: where = "field = 'some'"
2024-04-12 10:12:12,187 [INFO ] MainThread: hwm = AutoDetectHWM(
2024-04-12 10:12:12,187 [INFO ] MainThread: name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb',
2024-04-12 10:12:12,187 [INFO ] MainThread: entity = 'source_schema.table',
2024-04-12 10:12:12,187 [INFO ] MainThread: expression = 'updated_at',
2024-04-12 10:12:12,188 [INFO ] MainThread: )
2024-04-12 10:12:12,188 [INFO ] MainThread: options = {
2024-04-12 10:12:12,188 [INFO ] MainThread: 'fetchsize': 100000,
2024-04-12 10:12:12,188 [INFO ] MainThread: 'numPartitions': 1,
2024-04-12 10:12:12,189 [INFO ] MainThread: 'partitioningMode': 'range',
2024-04-12 10:12:12,189 [INFO ] MainThread: }
2024-04-12 10:12:12,189 [INFO ] MainThread: |MSSQL| Checking connection availability...
2024-04-12 10:12:12,189 [INFO ] MainThread: |MSSQL| Using connection parameters:
2024-04-12 10:12:12,190 [INFO ] MainThread: user = 'db_user'
2024-04-12 10:12:12,190 [INFO ] MainThread: password = SecretStr('**********')
2024-04-12 10:12:12,190 [INFO ] MainThread: host = 'mssql.host'
2024-04-12 10:12:12,190 [INFO ] MainThread: port = 1433
2024-04-12 10:12:12,191 [INFO ] MainThread: database = 'somedb'
2024-04-12 10:12:12,191 [INFO ] MainThread: extra = {'applicationIntent': 'ReadOnly', 'trustServerCertificate': 'true'}
2024-04-12 10:12:12,191 [INFO ] MainThread: jdbc_url = 'jdbc:sqlserver:/mssql.host:1433'
2024-04-12 10:12:12,579 [INFO ] MainThread: |MSSQL| Connection is available.
2024-04-12 10:12:12,581 [INFO ] MainThread: |MSSQL| Executing SQL query (on driver):
2024-04-12 10:12:12,581 [INFO ] MainThread: SELECT
2024-04-12 10:12:12,581 [INFO ] MainThread: MIN(updated_at) AS "min",
2024-04-12 10:12:12,582 [INFO ] MainThread: MAX(updated_at) AS "max"
2024-04-12 10:12:12,582 [INFO ] MainThread: FROM
2024-04-12 10:12:12,582 [INFO ] MainThread: source_schema.table
2024-04-12 10:12:12,582 [INFO ] MainThread: WHERE
2024-04-12 10:12:12,582 [INFO ] MainThread: (field = 'some')
2024-04-12 10:12:12,583 [INFO ] MainThread: AND
2024-04-12 10:12:12,583 [INFO ] MainThread: (updated_at >= CAST('2024-04-11T18:10:02.120000' AS datetime2))
2024-04-12 10:16:22,537 [INFO ] MainThread: |MSSQL| Received values:
2024-04-12 10:16:22,538 [INFO ] MainThread: MIN(updated_at) = datetime.datetime(2024, 4, 11, 21, 10, 7, 397000)
2024-04-12 10:16:22,538 [INFO ] MainThread: MAX(updated_at) = datetime.datetime(2024, 4, 12, 13, 12, 2, 123000)
2024-04-12 10:16:22,540 [INFO ] MainThread: |MSSQL| Executing SQL query (on executor):
2024-04-12 10:16:22,540 [INFO ] MainThread: SELECT
2024-04-12 10:16:22,540 [INFO ] MainThread: id,
2024-04-12 10:16:22,541 [INFO ] MainThread: new_value,
2024-04-12 10:16:22,541 [INFO ] MainThread: old_value,
2024-04-12 10:16:22,541 [INFO ] MainThread: updated_at
2024-04-12 10:16:22,541 [INFO ] MainThread: FROM
2024-04-12 10:16:22,541 [INFO ] MainThread: source_schema.table
2024-04-12 10:16:22,542 [INFO ] MainThread: WHERE
2024-04-12 10:16:22,542 [INFO ] MainThread: (field = 'some')
2024-04-12 10:16:22,542 [INFO ] MainThread: AND
2024-04-12 10:16:22,542 [INFO ] MainThread: (updated_at > CAST('2024-04-11T18:10:02.120000' AS datetime2))
2024-04-12 10:16:22,542 [INFO ] MainThread: AND
2024-04-12 10:16:22,542 [INFO ] MainThread: (updated_at <= CAST('2024-04-12T13:12:02.123000' AS datetime2))
2024-04-12 10:16:22,892 [INFO ] MainThread: |Spark| DataFrame successfully created from SQL statement
2024-04-12 10:16:22,892 [INFO ] MainThread: ------------------------------------ DBReader.run() ends ------------------------------------
2024-04-12 10:40:42,409 [INFO ] MainThread: =================================== DBWriter.run() starts ===================================
2024-04-12 10:40:42,409 [INFO ] MainThread: |Spark| -> |Hive| Writing DataFrame to target using parameters:
2024-04-12 10:40:42,410 [INFO ] MainThread: target = 'target_source_schema.table'
2024-04-12 10:40:42,410 [INFO ] MainThread: options = {
2024-04-12 10:40:42,410 [INFO ] MainThread: 'mode': 'append',
2024-04-12 10:40:42,410 [INFO ] MainThread: 'format': 'orc',
2024-04-12 10:40:42,410 [INFO ] MainThread: 'partitionBy': 'part_dt',
2024-04-12 10:40:42,410 [INFO ] MainThread: }
2024-04-12 10:40:42,411 [INFO ] MainThread: df_schema:
2024-04-12 10:40:42,412 [INFO ] MainThread: root
2024-04-12 10:40:42,412 [INFO ] MainThread: |-- id: integer (nullable = true)
2024-04-12 10:40:42,413 [INFO ] MainThread: |-- new_value: string (nullable = true)
2024-04-12 10:40:42,413 [INFO ] MainThread: |-- old_value: string (nullable = true)
2024-04-12 10:40:42,413 [INFO ] MainThread: |-- updated_at: timestamp (nullable = true)
2024-04-12 10:40:42,413 [INFO ] MainThread: |-- part_dt: date (nullable = true)
2024-04-12 10:40:42,414 [INFO ] MainThread:
2024-04-12 10:40:42,421 [INFO ] MainThread: |Hive| Checking connection availability...
2024-04-12 10:40:42,421 [INFO ] MainThread: |Hive| Using connection parameters:
2024-04-12 10:40:42,421 [INFO ] MainThread: cluster = 'dwh'
2024-04-12 10:40:42,475 [INFO ] MainThread: |Hive| Connection is available.
2024-04-12 10:40:42,476 [INFO ] MainThread: |Hive| Fetching schema of table 'target_source_schema.table' ...
2024-04-12 10:40:43,518 [INFO ] MainThread: |Hive| Schema fetched.
2024-04-12 10:40:43,521 [INFO ] MainThread: |Hive| Table 'target_source_schema.table' already exists
2024-04-12 10:40:43,521 [WARNING ] MainThread: |Hive| User-specified options {'partitionBy': 'part_dt'} are ignored while inserting into existing table. Using only table parameters from Hive metastore
2024-04-12 10:40:43,782 [INFO ] MainThread: |Hive| Inserting data into existing table 'target_source_schema.table' ...
2024-04-12 11:06:07,396 [INFO ] MainThread: |Hive| Data is successfully inserted into table 'target_source_schema.table'.
2024-04-12 11:06:07,397 [INFO ] MainThread: ------------------------------------ DBWriter.run() ends ------------------------------------
2024-04-12 11:06:07,397 [INFO ] MainThread: |onETL| Exiting IncrementalStrategy
2024-04-12 11:06:07,397 [INFO ] MainThread: |IncrementalStrategy| Saving HWM to 'HorizonHWMStore':
2024-04-12 11:06:07,397 [INFO ] MainThread: hwm = ColumnDateTimeHWM(
2024-04-12 11:06:07,397 [INFO ] MainThread: name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb',
2024-04-12 11:06:07,397 [INFO ] MainThread: entity = 'source_source_schema.table',
2024-04-12 11:06:07,397 [INFO ] MainThread: expression = 'updated_at',
2024-04-12 11:06:07,397 [INFO ] MainThread: value = datetime.datetime(2024, 4, 12, 13, 12, 2, 123000),
2024-04-12 11:06:07,495 [INFO ] MainThread: |IncrementalStrategy| HWM has been saved
Каждый шаг, выполняемый onETL, тщательно логируется, что должно помочь с отладкой.
Вы можете сделать логи еще более подробными, изменив уровень на DEBUG:
from onetl.log import setup_logging
setup_logging(level="DEBUG", enable_clients=True)
# остальной код
...
Это также изменяет уровень логирования для всех базовых библиотек Python, например, показывая каждый выполняемый HTTP-запрос и т. д.
::: onetl.log options: members: - setup_logging - setup_clients_logging - set_default_logging_format