Skip to content

ETL процессы на Airflow (DQ Airflow Operators)

Проверки DQ можно запускать в синхронном режиме в рамках Airflow DAG'ов, выполняющих загрузки данных и/или ETL процедуры.

Запуск проверок DQ и получение результатов осуществляется с помощью классов DQRunner и DQStatus в модуле dq_plugins.dq_operators

Пакет dq-plugins необходимо установить в окружение Airflow.

Для интеграции DQ в свой даг требуется:

  1. Создать группу проверок DQ для требуемого объекта проверки - описать метрики и правила проверок
  2. Передать в DQRunner параметр group
  3. Встроить DQRunner в pipeline дага
  4. Опционально: встроить DQStatus в pipeline дага (примеры ниже)

Шаблон для вызова DQ в Airflow dags (запуск и получение результата проверок):

DQ в даге airflow

from dq_plugins.dq_operators import DQRunner, DQStatus

run_dq = DQRunner(dag=dag, group='test_DQ_run', date='2025-01-09')
status = DQStatus(dag=dag, group='test_DQ_run', date='2025-01-09')

# Таски для запуска DQ и получения статуса проверок
run_dq >> status


Airflow Connection & Variables

Прежде чем использовать dq_operators, необходимо убедиться, что в Airflow создан connection к DQ API:

Параметр Значение
Conn Id dq_api
Conn Type HTTP
Host http://localhost
Port 443
Password access token (как получить)
Login <поле нужно оставить пустым>

Если у вас настроена отправка логов DQ в ELK или OpenSearch, можно настроить сохранение ссылки на Kibana по конкретному запуску проверок в лог таска DQRunner. Для этого в Airflow нужно создать следующий Variable:

Key Value
dq_kibana_url https://kibana-host/.../app/kibana#/discover?_g=(refreshInterval:(pause:!t,value:0),time:(from:'{time_from}',to:'{time_to}'))&...,query:(language:kuery,query:'run_id:%20%22{run_id}%22%20and%20levelname:%20ERROR'),...

В значении переменной можно использовать плэйсходеры, которые будут заполнены фактическими значениями в момент генерации ссылки:

  • {time_from} - фильтр по времени начала вычисления проверок

  • {time_to} - фильтр по времени окончания вычисления проверок

  • {run_id} - уникальный UUID запуска проверок


Запуск DQ проверок (класс DQRunner)

Оператор DQRunner осуществляет запуск процесса подсчета DQ метрик.

По-умолчанию, оператор будет ждать, пока не завершится подсчет метрик на объекте проверки, и только после этого таск перейдет в состояние success

Это поведение регулируется параметрами wait (дожидаться окончания или нет) и dq_job_wait_time (максимальное время ожидания)

Параметры DQRunner:

  • dag - DAG инстанс основного (родительского) ETL процесса

  • task_id - название таска

    • по-умолчанию: dq_runner_<значение параметра group>
  • group - имя группы проверок

  • date - дата в формате YYYY-MM-DD, за которую будут запущены проверки  (параметр опционален)

    • по-умолчанию, дата вычисляется как next_execution_date текущего DagRun
  • runtime_params - параметры для динамической фильтрации данных runtime_params в конфигах

  • wait - булево значение

    • True - ждать, пока не завершится процесс DQ

    • False - просто запустить подсчет метрик и не ждать, пока он завершится

    • по умолчанию = True

  • http_conn_id- параметры подключения к API из Airflow Connection

    • по-умолчанию = "dq_api"
  • dq_job_wait_time - максимальное время ожидания, пока не завершится процесс подсчета DQ метрик (в секундах)

    • по-умолчанию = 3600, если вы ожидаете что проверка может занять больше времени то лучше задать
  • dq_job_sleep_interval - период опроса DQ процесса - завершился или все еще работает (в секундах)

    • по-умолчанию = 10
  • force - булево значение

    • True - выполнять проверки при любых условиях

    • False - проверки будет выполняться, если за данную дату нет результатов или они отличны от успешных (статус группы проверок != 1)

    • по-умолчанию = True

  • fail - булево значение

    • True - таск будет "красным", если хотя бы одна метрика упала с исключением (например, ошибка в sql метрики)

    • False - таск будет "зеленым" независимо от статуса метрик

    • по-умолчанию = False

В параметрах DQRunner можно использовать значение из xcom, например:

Параметры из xcom

run_dq = DQRunner(dag=dag, group='test_ETL_demo', date="{{ti.xcom_pull(task_ids='push_date_task', key='check_date')}}")


Пример синхронного вызова DQRunner:

Синхронный вызов DQ в даге airflow

from dq_plugins.dq_operators import DQRunner

start_task = DummyOperator( task_id="start_ETL_task", dag=dag )
end_task = DummyOperator( task_id="end_ETL_task", dag=dag )

dq_job = DQRunner(dag=dag, group='test_DQ', date='2025-01-09')
start_task >> dq_job >> end_task


Получение статуса DQ проверок в даге (класс DQStatus)

Оператор DQStatus позволяет получить агрегированный статус DQ метрик по именны группы проверок (group).

В зависимости от параметров оператор может:

  1. Возвращать статус проверки в виде словаря {<date:str>: <status:int>} в X-Com для последующего чтения (параметр xcom_push=True)

  2. Падать и ничего не возвращать, если статус отрицательный (параметр fail_if_negative=True)


Параметры DQStatus:

  • dag - DAG инстанс основного (родительского) ETL процесса

  • task_id - название таска

    • по-умолчанию: dq_check_status_<значение параметра group>
  • group - имя группы проверок

  • date - дата в формате YYYY-MM-DD, за которую будут запущены проверки (параметр опционален)

    • по-умолчанию, дата вычисляется как next_execution_date текущего DagRun
  • date_from и date_to - даты в формате YYYY-MM-DD, задают диапазон за которые необходимо получить результаты проверок. Может использоваться вместо параметра date

  • http_conn_id - параметры подключения к API из Airflow Connection

    • по-умолчанию = "dq_api"
  • xcom_push - булево значение

    • True - кладет результат в X-COM

    • False - в x-com ничего не записывается

    • по умолчанию = False

  • fail_if_negative - булево значение

    • True - в случае отрицательного статуса группы проверок таск будет "красным" (failed стасус в airflow)

    • False - таск всегда "зеленый"

    • по умолчанию = True

Пример вызова DQStatus:

from dq_plugins.dq_operators import DQRunner, DQStatus

run_dq = DQRunner(dag=dag, group='test_DQ_run', date='2025-01-09')
status = DQStatus(dag=dag, group='test_DQ_run', date='2025-01-09')

run_dq >> status

Результат проверки доступен в XCom в таске dq_status при его завершение, самостоятельно прочесть статус можно оттуда.


Логи DQ

Оператор DQRunner сохраняет в лог Airfow уникальный идентификатор DQ задачи - run_id:

[2025-01-10 17:00:34,060] {dq_operators.py:203} INFO - Started run_dq job for group 'test_ETL_demo' and date '2025-01-09'
[2025-01-10 17:00:34,060] {dq_operators.py:205} INFO - DQ process run_id: 2cbb6b6b-8492-48eb-9cbd-f76e407ddd6c

Детальные логи о работе DQ, в том числе сообщения об ошибках при подсчете отдельных метрик, доступны в Kibana, где их можно найти по run_id конкретного таска DQ


Примеры синхронного вызова DQ


Запустить подсчет DQ метрик и не ждать завершения процесса

Синхронный вызов DQ в даге airflow

from dq_plugins.dq_operators import DQRunner, DQStatus

start_task = DummyOperator( task_id="start_ETL_task", dag=dag )
end_task = DummyOperator( task_id="end_ETL_task", dag=dag )

run_dq = DQRunner(dag=dag, group='test_DQ_run', date='2025-01-09', wait=False)

start_task >> run_dq >> end_task


Запустить подсчет DQ метрик, дождаться выполнения, получить статус

Синхронный вызов DQ в даге airflow

from dq_plugins.dq_operators import DQRunner, DQStatus

start_task = DummyOperator( task_id="start_ETL_task", dag=dag )
end_task = DummyOperator( task_id="end_ETL_task", dag=dag )

run_dq = DQRunner(dag=dag, group='test_DQ_run', date='2025-01-09')
status = DQStatus(dag=dag, group='test_DQ_run', date='2025-01-09')

start_task >> run_dq >> status >> end_task


Запустить подсчет DQ метрик, дождаться выполнения, получить статус

Синхронный вызов DQ в даге airflow

from dq_plugins.dq_operators import DQRunner, DQStatus

start_task = DummyOperator( task_id="start_ETL_task", dag=dag )

run_dq = DQRunner(dag=dag, group='test_DQ_run', date='2025-01-09')
status = DQStatus(dag=dag, group='test_DQ_run', date='2025-01-09', task_id='dq_check_status', xcom_push=True)

def check_xcom(**kwargs):
    ti = kwargs['ti']
    msg = ti.xcom_pull(task_ids='dq_check_status')
    next_task = 'success'

    for _, status in msg.items():
        if status <= 0:
            next_task = 'fail'

    return next_task

branching = BranchPythonOperator(task_id='branching', python_callable=check_xcom, dag=dag, provide_context=True)

start_task >> run_dq >> status >> branching >> [success, fail]


Значения статуса группы проверок (check_status)

Значения статуса группы проверок, доступные в xcom и обрабатываемые в таске Check_status имеют следующую семантику:

check_status Описание
1 Проверки прошли успешно
0 Значение референсной метрики не найдено в базе DQ. Скорее всего референсная метрика не была посчитаны за дату проверки
-1 Проверки прошли не успешно (несоответствие пороговым значениям из правила проверки)
-2 Значение метрики не найдено в базе DQ. Скорее всего метрики еще не были посчитаны за запрашиваемую дату