【Airflow】Apache Airflow ~ DAGが失敗後に別DAGをPauseするには ~

◾️はじめに

 Airflow の DAGが失敗した場合、
別DAGをPauseする方法について考える

目次

【0】Airflow環境設定
【1】airflow.models.dag.DagModel.set_is_paused() を使う
【2】Airflow CLIのairflow dag pause を使う
【3】Airflow REST API を使う

【0】Airflow環境設定

* 以下の関連記事を参照のこと

Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840

【1】airflow.models.dag.DagModel.set_is_paused() を使う

https://airflow.apache.org/docs/apache-airflow/2.1.0/_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.set_is_paused

from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.models import DagModel
from datetime import datetime


def pause_dag(context):
    dag_id_to_pause = "dag_b"
    print(f"DAG {dag_id_to_pause} is going to be paused...")

    try:
        dag_model = DagModel.get_dagmodel(dag_id_to_pause)
        if not dag_model.is_paused:
            dag_model.set_is_paused(is_paused=True)
            print(f"Pause DAG {dag_id_to_pause}") 
        else:
            print(f"DAG {dag_id_to_pause} is already paused.")
    except Exception as e:
        print(f"Error: {e}")

default_args = {
    'owner': 'airflow',
    'on_failure_callback': pause_dag
}


# -----------------------------
# DAG A 定義
# -----------------------------
with DAG(
    dag_id="dag_a",
    start_date=datetime(2026, 1, 24),
    schedule="@daily",
    catchup=False,
    default_args=default_args,
) as dag_a:

    task1_1 = BashOperator(
        task_id="task1_1",
        bash_command='echo "success from task1_1"',
    )

    def fail_task():
        raise Exception("Intentional Failure")

    fail = PythonOperator(
        task_id="fail",
        python_callable=fail_task,
    )

    task1_2 = BashOperator(
        task_id="task1_2",
        bash_command='echo "success from task1_2"',
    )

    task1_1 >> fail >> task1_2


with DAG(
    dag_id="dag_b",
    start_date=datetime(2026, 1, 24),
    schedule="@daily",
    catchup=False,
) as dag_b:

    task2_1 = BashOperator(
        task_id="task2_1",
        bash_command='echo "success from task2_1"',
    )
    task2_2 = BashOperator(
        task_id="task2_2",
        bash_command='echo "success from task2_2"',
    )

    task2_1 >> task2_2

1)使用上の注意

* Airflow 3.0 では使えないっぽい

メッセージ

Direct database access via the ORM is not allowed in Airflow 3.0

【2】Airflow CLIのairflow dag pause を使う

* 以下の関連記事で取り扱ったAirflow CLIのairflow dag pause を使う

Apache Airflow ~ CLI
https://dk521123.hatenablog.com/entry/2021/10/21/130702

    task1_2 = BashOperator(
        task_id="task1_2",
        bash_command="airflow dags pause world_v3",
    )

【3】Airflow REST API を使う

import requests
import os

TOKEN = os.environ["AIRFLOW_API_TOKEN"]

def pause_dag(context):
    r = requests.patch(
        "http://airflow-apiserver:8080/api/v2/dags/dag_b",
        headers={
            "Authorization": f"Bearer {TOKEN}",
            "Content-Type": "application/json",
        },
        json={"is_paused": True},
        timeout=10,
    )
    r.raise_for_status()

関連記事

Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ DAG作成 ~
https://dk521123.hatenablog.com/entry/2023/07/01/000000
Apache Airflow ~ CLI
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ DAG完了後に別DAG実行を考える ~
https://dk521123.hatenablog.com/entry/2026/01/23/222936