◾️はじめに
Airflow の DAGが失敗した場合、 別DAGをPauseする方法について考える
目次
【0】Airflow環境設定 【1】airflow.models.dag.DagModel.set_is_paused() を使う 【2】Airflow CLIのairflow dag pause を使う 【3】Airflow REST API を使う
【0】Airflow環境設定
* 以下の関連記事を参照のこと
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
【1】airflow.models.dag.DagModel.set_is_paused() を使う
from airflow import DAG from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.python import PythonOperator from airflow.models import DagModel from datetime import datetime def pause_dag(context): dag_id_to_pause = "dag_b" print(f"DAG {dag_id_to_pause} is going to be paused...") try: dag_model = DagModel.get_dagmodel(dag_id_to_pause) if not dag_model.is_paused: dag_model.set_is_paused(is_paused=True) print(f"Pause DAG {dag_id_to_pause}") else: print(f"DAG {dag_id_to_pause} is already paused.") except Exception as e: print(f"Error: {e}") default_args = { 'owner': 'airflow', 'on_failure_callback': pause_dag } # ----------------------------- # DAG A 定義 # ----------------------------- with DAG( dag_id="dag_a", start_date=datetime(2026, 1, 24), schedule="@daily", catchup=False, default_args=default_args, ) as dag_a: task1_1 = BashOperator( task_id="task1_1", bash_command='echo "success from task1_1"', ) def fail_task(): raise Exception("Intentional Failure") fail = PythonOperator( task_id="fail", python_callable=fail_task, ) task1_2 = BashOperator( task_id="task1_2", bash_command='echo "success from task1_2"', ) task1_1 >> fail >> task1_2 with DAG( dag_id="dag_b", start_date=datetime(2026, 1, 24), schedule="@daily", catchup=False, ) as dag_b: task2_1 = BashOperator( task_id="task2_1", bash_command='echo "success from task2_1"', ) task2_2 = BashOperator( task_id="task2_2", bash_command='echo "success from task2_2"', ) task2_1 >> task2_2
1)使用上の注意
* Airflow 3.0 では使えないっぽい
メッセージ
Direct database access via the ORM is not allowed in Airflow 3.0
【2】Airflow CLIのairflow dag pause を使う
* 以下の関連記事で取り扱ったAirflow CLIのairflow dag pause を使う
Apache Airflow ~ CLI ~
https://dk521123.hatenablog.com/entry/2021/10/21/130702
task1_2 = BashOperator(
task_id="task1_2",
bash_command="airflow dags pause world_v3",
)
【3】Airflow REST API を使う
import requests import os TOKEN = os.environ["AIRFLOW_API_TOKEN"] def pause_dag(context): r = requests.patch( "http://airflow-apiserver:8080/api/v2/dags/dag_b", headers={ "Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json", }, json={"is_paused": True}, timeout=10, ) r.raise_for_status()
関連記事
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ DAG作成 ~
https://dk521123.hatenablog.com/entry/2023/07/01/000000
Apache Airflow ~ CLI ~
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ DAG完了後に別DAG実行を考える ~
https://dk521123.hatenablog.com/entry/2026/01/23/222936