【Airflow】Apache Airflow ~ Catchup ~

■ はじめに

https://dk521123.hatenablog.com/entry/2024/10/05/234219

の続き。

前回の記事の内容が、ほとんど「Backfill」だったので
今回は、「Catchup」を集中で取り上げる

目次

【1】Catchup
【2】設定方法
 1)DAG
 2)airflow.cfg
【3】Hello World for Catchup
 1)Airflow環境
 2)DAG作成
 3)動作確認
【4】Airflow の pause 機能を使った際の挙動
 1)「catcup=False」の場合
 2)「catcup=True」の場合

【1】Catchup

* Airflow のスケジューラーは、start_date/end_date/schedule_intervalにより
 DAG Runを生成して実行するが、
 未実行の(あるいはクリアされている)データ区間をすべて実行することを
 Catchup(キャッチアップ)と呼ぶ

cf. Catchup = 遅れを取り戻す

https://airflow.apache.org/docs/apache-airflow/2.6.0/core-concepts/dag-run.html#catchup

【2】設定方法

1)DAG

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

import datetime
import pendulum

dag = DAG(
    "tutorial",
    start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
    description="A simple tutorial DAG",
    schedule="@daily",
    # ★注目:catchup=False の場合、過去日付の DAG 実行を無効★
    catchup=True,
)

2)airflow.cfg

* 設定ファイル「airflow.cfg」でも設定可能

catchup_by_default

* デフォルトのcatchupを指定する
 => catchup_by_default=False した場合、
  デフォルトでDAGのcatchupがFalseに設定
* Default = True
* 環境変数「AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT」

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#catchup-by-default

【3】Hello World for Catchup

1)Airflow環境

* 以下の関連記事を参照のこと

Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840

2)DAG作成

airflow-docker/dags/demo_dag2.py

import airflow
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

import pendulum

default_args = {
  "owner": "airflow"
}
@dag(
  'demo_dag2',
  description="A simple DAG",
  default_args=default_args,
  schedule_interval="@daily",
  # ★注目★
  catchup=True,
  start_date=pendulum.datetime(2024, 9, 15, tz="UTC"),
  # https://future-architect.github.io/articles/20200131/
  concurrency=2, # task は 2つまで同時起動できる
  max_active_runs=1, # DAG は 1つまで同時起動できる
  tags=['example']
)
def main_dag():
  @task
  def get_hello_world():
     return 'hello world'

  @task
  def say_hello(result):
    print(result)

  result = get_hello_world()
  say_hello(result)

main_dag = main_dag()

3)動作確認

[1] ブラウザでAirflow Web UIにアクセス

http://localhost:8080/

[2] DAG ID「demo_dag2」の「Pause/Unpause DAG」をONにする

 => ONにすると、start_dateに指定した日時 (今回の場合、「2024/09/15」)
  から現在日時(今回の場合、「2024/10/06」)まで実行されていく

【4】Airflow の pause 機能を使った際の挙動

* Airflow の pause 機能(一時停止状態。Web UIのDAGの右にあるトグルボタン)
 を使った際に、一時停止をして再度再開した場合、挙動

1)「catcup=False」の場合

* 一時停止した間に行うはずだった直近のDAGが即実行される
 => 以下のサイトの「catcup=False にまつわるやらかしで多いのが、
  pause 解除と同時にDAGが動き出す」も参照。

https://kencharos.hatenablog.com/entry/2020/05/07/161558

2)「catcup=True」の場合

* 一時停止した間に行うはずだったDAGが全て即実行される

参考文献

https://analytics.livesense.co.jp/entry/2018/02/06/132842
https://kencharos.hatenablog.com/entry/2020/05/07/161558

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ 実行タイミング ~
https://dk521123.hatenablog.com/entry/2022/01/15/014005
Apache Airflow ~ CLI
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ Backfill ~
https://dk521123.hatenablog.com/entry/2024/10/05/234219
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020