Что такое DAG в контексте Apache Airflow?

DAG (Directed Acyclic Graph — направленный ацикличный граф) в контексте Apache Airflow представляет собой основную структуру, определяющую порядок выполнения задач (tasks) в пайплайне обработки данных. Это ключевая концепция, вокруг которой строится вся логика работы Apache Airflow. DAG описывает, какие задачи нужно выполнить, в каком порядке, с какими зависимостями, и как часто их нужно запускать. Он определяет граф задач, где каждая вершина — это задача (Task), а ребро — зависимость между задачами.

Основные характеристики DAG:

  1. Directed (направленный)
    Поток исполнения идет в одном направлении — от upstream задач к downstream. Это означает, что одна задача зависит от результата другой и выполняется после неё.

  2. Acyclic (ацикличный)
    Граф не может содержать циклов. Это исключает возможность зацикливания выполнения — задача не может зависеть от самой себя ни напрямую, ни через цепочку других задач.

  3. Graph (граф)
    Математическая структура, состоящая из узлов (tasks) и рёбер (dependencies), позволяющая описать сложные зависимости и параллелизм.

Что такое DAG в Airflow на практике

В Apache Airflow DAG — это Python-скрипт, в котором:

  • Определяется сам граф DAG.

  • Устанавливаются расписание (schedule), настройки и метаданные.

  • Описываются зависимости между задачами.

  • Определяются конкретные задачи (Tasks), обычно с помощью операторов (Operators).

Пример базового DAG:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id="simple_dag_example",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
catchup=False
) as dag:
task1 = BashOperator(
task_id="print_date",
bash_command="date"
)
task2 = BashOperator(
task_id="print_hello",
bash_command="echo Hello"
)
task3 = BashOperator(
task_id="print_world",
bash_command="echo World"
)
task1 >> \[task2, task3\] # task2 и task3 зависят от task1

Здесь:

  • dag_id — имя DAG-а.

  • start_date — дата начала планирования.

  • schedule_interval — интервал между запусками (ежедневно).

  • task1, task2, task3 — задачи (узлы графа).

  • >> — определяет порядок: task1 → task2 и task1 → task3.

Составные элементы DAG-а

  1. DAG Definition
    Объект DAG содержит основную конфигурацию: имя, расписание, дата начала, параметры ретраев, SLA, настройки email-уведомлений, и т.д.

  2. Tasks
    Определяются через Operators, например PythonOperator, BashOperator, HttpSensor, EmailOperator, DummyOperator, SqlOperator и др. Они представляют собой конкретную единицу работы.

  3. Task Dependencies
    Используются операторы >> и <<, а также методы set_upstream() и set_downstream(). Эти зависимости и формируют структуру DAG.

  4. Schedule Interval
    Указывает, как часто должен выполняться DAG (@daily, @hourly, cron-формат и т.п.).

  5. Start Date и Catchup
    Управляют тем, с какой даты начнётся выполнение DAG, и будут ли запускаться пропущенные интервалы (catchup=True/False).

  6. Default Arguments (default_args)
    Словарь с настройками по умолчанию для всех задач: retries, retry_delay, email_on_failure, owner и пр.

Принцип работы DAG в Airflow

  • Airflow запускает планировщик (Scheduler), который периодически сканирует DAG-файлы.

  • Если приходит время для нового запуска DAG согласно schedule_interval, создается run DAG-а.

  • Scheduler создает задачи (Task Instances), выставляет их в очередь, в зависимости от зависимостей и состояний предыдущих задач.

  • Executor (например, LocalExecutor, CeleryExecutor, KubernetesExecutor) запускает задачи.

  • Worker (или Pod, если используется Kubernetes) исполняет задачу.

Динамические DAG-и

Так как DAG описывается на Python, его можно строить динамически — например, создавать задачи в цикле или по шаблону:

for i in range(5):
BashOperator(
task_id=f"print_{i}",
bash_command=f"echo {i}",
dag=dag
)

DAG Run и Task Instance

  • DAG Run — это конкретное выполнение DAG-а, связанное с определённой датой запуска.

  • Task Instance — конкретный запуск одной задачи внутри DAG Run, имеет состояние: queued, running, success, failed, skipped, up_for_retry и др.

Примеры продвинутого использования:

  • Branching: использование BranchPythonOperator для выбора пути выполнения.

  • SubDAGs: вложенные DAG-и для повторно используемых частей графа (устаревший подход, лучше использовать TaskGroups).

  • TaskGroup: логическая группировка задач в один блок, упрощает визуализацию.

  • Sensor: специальный тип задач, ожидающих внешнее условие (например, появление файла в S3).

Визуализация DAG

Airflow предоставляет Web-интерфейс, где можно видеть:

  • Граф задач.

  • Историю выполнений.

  • Логи задач.

  • Состояние выполнения (с подсветкой: зелёный — success, красный — failed, серый — skipped).

  • Интерактивное управление (рестарт, принудительный запуск, пропуск и т.д.).

Практические сценарии использования DAG

  • Пайплайн ETL: extract → transform → load.

  • ML pipeline: подготовка данных → обучение модели → валидация → деплой.

  • Интеграция с внешними API: опрос API → загрузка в DWH.

  • Агрегация логов: сбор из S3 → нормализация → запись в Elasticsearch.

  • Генерация отчётов и отправка по почте или Telegram.

DAG в Apache Airflow — это не просто структура кода, а декларация всего процесса выполнения данных, включая логику, расписание, порядок и зависимость задач. Это фундаментальный объект, управляющий тем, как, когда и в какой последовательности обрабатываются данные.