ETL процессы на Airflow (DQ Airflow Operators)
Проверки DQ можно запускать в синхронном режиме в рамках Airflow DAG'ов, выполняющих загрузки данных и/или ETL процедуры.
Запуск проверок DQ и получение результатов осуществляется с помощью классов DQRunner и DQStatus в модуле dq_plugins.dq_operators
Пакет dq-plugins необходимо установить в окружение Airflow.
Для интеграции DQ в свой даг требуется:
- Создать группу проверок DQ для требуемого объекта проверки - описать метрики и правила проверок
- Передать в DQRunner параметр group
- Встроить DQRunner в pipeline дага
- Опционально: встроить DQStatus в pipeline дага (примеры ниже)
Шаблон для вызова DQ в Airflow dags (запуск и получение результата проверок):
DQ в даге airflow
from dq_plugins.dq_operators import DQRunner, DQStatus run_dq = DQRunner(dag=dag, group='test_DQ_run', date='2025-01-09') status = DQStatus(dag=dag, group='test_DQ_run', date='2025-01-09') # Таски для запуска DQ и получения статуса проверок run_dq >> status
Airflow Connection & Variables
Прежде чем использовать dq_operators, необходимо убедиться, что в Airflow создан connection к DQ API:
| Параметр | Значение |
|---|---|
| Conn Id | dq_api |
| Conn Type | HTTP |
| Host | http://localhost |
| Port | 443 |
| Password | access token (как получить) |
| Login | <поле нужно оставить пустым> |
Если у вас настроена отправка логов DQ в ELK или OpenSearch, можно настроить сохранение ссылки на Kibana по конкретному запуску проверок в лог таска DQRunner. Для этого в Airflow нужно создать следующий Variable:
| Key | Value |
|---|---|
| dq_kibana_url | https://kibana-host/.../app/kibana#/discover?_g=(refreshInterval:(pause:!t,value:0),time:(from:'{time_from}',to:'{time_to}'))&...,query:(language:kuery,query:'run_id:%20%22{run_id}%22%20and%20levelname:%20ERROR'),... |
В значении переменной можно использовать плэйсходеры, которые будут заполнены фактическими значениями в момент генерации ссылки:
-
{time_from} - фильтр по времени начала вычисления проверок
-
{time_to} - фильтр по времени окончания вычисления проверок
-
{run_id} - уникальный UUID запуска проверок
Запуск DQ проверок (класс DQRunner)
Оператор DQRunner осуществляет запуск процесса подсчета DQ метрик.
По-умолчанию, оператор будет ждать, пока не завершится подсчет метрик на объекте проверки, и только после этого таск перейдет в состояние success
Это поведение регулируется параметрами wait (дожидаться окончания или нет) и dq_job_wait_time (максимальное время ожидания)
Параметры DQRunner:
-
dag - DAG инстанс основного (родительского) ETL процесса
-
task_id - название таска
- по-умолчанию: dq_runner_<значение параметра group>
-
group - имя группы проверок
-
date - дата в формате YYYY-MM-DD, за которую будут запущены проверки (параметр опционален)
- по-умолчанию, дата вычисляется как next_execution_date текущего DagRun
-
runtime_params - параметры для динамической фильтрации данных runtime_params в конфигах
-
wait - булево значение
-
True - ждать, пока не завершится процесс DQ
-
False - просто запустить подсчет метрик и не ждать, пока он завершится
-
по умолчанию = True
-
-
http_conn_id- параметры подключения к API из Airflow Connection
- по-умолчанию = "dq_api"
-
dq_job_wait_time - максимальное время ожидания, пока не завершится процесс подсчета DQ метрик (в секундах)
- по-умолчанию = 3600, если вы ожидаете что проверка может занять больше времени то лучше задать
-
dq_job_sleep_interval - период опроса DQ процесса - завершился или все еще работает (в секундах)
- по-умолчанию = 10
-
force - булево значение
-
True - выполнять проверки при любых условиях
-
False - проверки будет выполняться, если за данную дату нет результатов или они отличны от успешных (статус группы проверок != 1)
-
по-умолчанию = True
-
-
fail - булево значение
-
True - таск будет "красным", если хотя бы одна метрика упала с исключением (например, ошибка в sql метрики)
-
False - таск будет "зеленым" независимо от статуса метрик
-
по-умолчанию = False
-
В параметрах DQRunner можно использовать значение из xcom, например:
Параметры из xcom
run_dq = DQRunner(dag=dag, group='test_ETL_demo', date="{{ti.xcom_pull(task_ids='push_date_task', key='check_date')}}")
Пример синхронного вызова DQRunner:
Синхронный вызов DQ в даге airflow
from dq_plugins.dq_operators import DQRunner start_task = DummyOperator( task_id="start_ETL_task", dag=dag ) end_task = DummyOperator( task_id="end_ETL_task", dag=dag ) dq_job = DQRunner(dag=dag, group='test_DQ', date='2025-01-09') start_task >> dq_job >> end_task
Получение статуса DQ проверок в даге (класс DQStatus)
Оператор DQStatus позволяет получить агрегированный статус DQ метрик по именны группы проверок (group).
В зависимости от параметров оператор может:
-
Возвращать статус проверки в виде словаря {<date:str>: <status:int>} в X-Com для последующего чтения (параметр xcom_push=True)
-
Падать и ничего не возвращать, если статус отрицательный (параметр fail_if_negative=True)
Параметры DQStatus:
-
dag - DAG инстанс основного (родительского) ETL процесса
-
task_id - название таска
- по-умолчанию: dq_check_status_<значение параметра group>
-
group - имя группы проверок
-
date - дата в формате YYYY-MM-DD, за которую будут запущены проверки (параметр опционален)
- по-умолчанию, дата вычисляется как next_execution_date текущего DagRun
-
date_from и date_to - даты в формате YYYY-MM-DD, задают диапазон за которые необходимо получить результаты проверок. Может использоваться вместо параметра date
-
http_conn_id - параметры подключения к API из Airflow Connection
- по-умолчанию = "dq_api"
-
xcom_push - булево значение
-
True - кладет результат в X-COM
-
False - в x-com ничего не записывается
-
по умолчанию = False
-
-
fail_if_negative - булево значение
-
True - в случае отрицательного статуса группы проверок таск будет "красным" (failed стасус в airflow)
-
False - таск всегда "зеленый"
-
по умолчанию = True
-
Пример вызова DQStatus:
from dq_plugins.dq_operators import DQRunner, DQStatus
run_dq = DQRunner(dag=dag, group='test_DQ_run', date='2025-01-09')
status = DQStatus(dag=dag, group='test_DQ_run', date='2025-01-09')
run_dq >> status
Результат проверки доступен в XCom в таске dq_status при его завершение, самостоятельно прочесть статус можно оттуда.
Логи DQ
Оператор DQRunner сохраняет в лог Airfow уникальный идентификатор DQ задачи - run_id:
[2025-01-10 17:00:34,060] {dq_operators.py:203} INFO - Started run_dq job for group 'test_ETL_demo' and date '2025-01-09'
[2025-01-10 17:00:34,060] {dq_operators.py:205} INFO - DQ process run_id: 2cbb6b6b-8492-48eb-9cbd-f76e407ddd6c
Детальные логи о работе DQ, в том числе сообщения об ошибках при подсчете отдельных метрик, доступны в Kibana, где их можно найти по run_id конкретного таска DQ
Примеры синхронного вызова DQ
Запустить подсчет DQ метрик и не ждать завершения процесса
Синхронный вызов DQ в даге airflow
from dq_plugins.dq_operators import DQRunner, DQStatus start_task = DummyOperator( task_id="start_ETL_task", dag=dag ) end_task = DummyOperator( task_id="end_ETL_task", dag=dag ) run_dq = DQRunner(dag=dag, group='test_DQ_run', date='2025-01-09', wait=False) start_task >> run_dq >> end_task
Запустить подсчет DQ метрик, дождаться выполнения, получить статус
Синхронный вызов DQ в даге airflow
from dq_plugins.dq_operators import DQRunner, DQStatus start_task = DummyOperator( task_id="start_ETL_task", dag=dag ) end_task = DummyOperator( task_id="end_ETL_task", dag=dag ) run_dq = DQRunner(dag=dag, group='test_DQ_run', date='2025-01-09') status = DQStatus(dag=dag, group='test_DQ_run', date='2025-01-09') start_task >> run_dq >> status >> end_task
Запустить подсчет DQ метрик, дождаться выполнения, получить статус
Синхронный вызов DQ в даге airflow
from dq_plugins.dq_operators import DQRunner, DQStatus start_task = DummyOperator( task_id="start_ETL_task", dag=dag ) run_dq = DQRunner(dag=dag, group='test_DQ_run', date='2025-01-09') status = DQStatus(dag=dag, group='test_DQ_run', date='2025-01-09', task_id='dq_check_status', xcom_push=True) def check_xcom(**kwargs): ti = kwargs['ti'] msg = ti.xcom_pull(task_ids='dq_check_status') next_task = 'success' for _, status in msg.items(): if status <= 0: next_task = 'fail' return next_task branching = BranchPythonOperator(task_id='branching', python_callable=check_xcom, dag=dag, provide_context=True) start_task >> run_dq >> status >> branching >> [success, fail]
Значения статуса группы проверок (check_status)
Значения статуса группы проверок, доступные в xcom и обрабатываемые в таске Check_status имеют следующую семантику:
| check_status | Описание |
|---|---|
| 1 | Проверки прошли успешно |
| 0 | Значение референсной метрики не найдено в базе DQ. Скорее всего референсная метрика не была посчитаны за дату проверки |
| -1 | Проверки прошли не успешно (несоответствие пороговым значениям из правила проверки) |
| -2 | Значение метрики не найдено в базе DQ. Скорее всего метрики еще не были посчитаны за запрашиваемую дату |



