【Airflow】Apache Airflow ~ 実行タイミング ~

■ はじめに

https://dk521123.hatenablog.com/entry/2022/01/13/101634

の「実行タイミングに関わる引数」で、
「start_date」「end_date」「schedule_interval」「catchup」「timetable」
について扱ったが、掘り下げてみる。

目次

【0】Airflowの実行タイミングについて
 1)サンプル
【1】start_date
 1)使用上の注意
【2】end_date
【3】schedule_interval
 1)記述方法
 2)使用上の注意
【4】catchup
【5】timetable
【6】その他・注意事項
 1)Airflow の pause 機能を使った際の挙動

【0】Airflowの実行タイミングについて

* 基本的には、「start_date から end_date まで、 schedule_interval の間隔で実行」
 => start_date + schedule_interval で実行
* 以下のサイトの説明が分かりやすい。

Apache Airflowでエンドユーザーのための機械学習パイプラインを構築する Part4 | by piqcy | programming-soda | Medium

Airflow の流れを制す - kencharosの日記

1)サンプル

import os
from datetime import datetime
from datetime import timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator

def say_hello(**context):
  current_datetime = datetime.now()
  print(f"Hello world. now = {current_datetime}")

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  schedule_interval=timedelta(days=1),
  start_date=datetime.now(),
  catchup=False
) as dag:
  task1 = DummyOperator(task_id="task1")
  task2 = PythonOperator(
    task_id='task2',
    dag=dag,
    python_callable=say_hello,
  )
  task1 >> task2

【1】start_date

* ワークフローの開始日付
 => スケジューラーが遡って実行を試みるタイムスタンプ

データ型 / デフォルト値

* Optional[datetime] << ★「1)使用上の注意」の[1]も参照
* Default : None

1)使用上の注意

[1] start_date は 設定必須

* 前述で「Optional[datetime]」とあるが、
 あくまでDAG引数としてだけで
 少なくとも MWAA (Apache Airflow v2.0.2) では、
 start_date を DAGのコンストラクタ引数 又は 
 各Operatorに設定しておかないと
 例外「AirflowException: Task is missing the start_date parameter」になる
 => 基本的には、DAGのコンストラクタ引数 start_date を
  設定するようにした方がよさそう

[2] 過去日付を設定した場合、遡って実行

* start_date に対して、過去日付を入れると遡って実行される
 => それを防ぐために、
  「catchup=False」(後述「【4】catchup」参照)
  を設定する

[3] 実行すると変更できない

* 詳細は、以下のサイト参照。

https://future-architect.github.io/articles/20200131/#3-start-date-%E3%81%AE%E5%A4%89%E6%9B%B4%E3%81%8C%E3%81%A7%E3%81%8D%E3%81%AA%E3%81%84

【2】end_date

* ワークフローの終了日付
 => スケジューラーが実行する最終日付のタイムスタンプ
 => 「A date beyond which your DAG won’t run」とあるので
  終了日付まで実行し、その日付を超えた場合、実行されない

データ型 / デフォルト値

* Optional[datetime]
* Default : None

【3】schedule_interval

* ワークフローの実行間隔
 => 設定方法は、後述「1)記述方法」を参照のこと。

データ型 / デフォルト値

* ScheduleIntervalArg
 => datetime.timedelta or dateutil.relativedelta.relativedelta
* Default : ScheduleIntervalArgNotSet

1)記述方法

[1] datetime.timedeltaオブジェクト

* datetime.timedelta を使って指定する

https://docs.python.org/ja/3/library/datetime.html#timedelta-objects

* 設定例:schedule_interval=timedelta(days=1)
* 時間を設定したい場合は、start_dateを以下のようにすると実行される

例:datetime.timedelta 指定で時間を設定したい場合

# UTC時間「10:30:00.000」で、一日ごとに開始する場合
# days_ago のソースを参照(https://airflow.apache.org/docs/apache-airflow/1.10.3/_modules/airflow/utils/dates.html)
with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  schedule_interval=timedelta(days=1),
  start_date=days_ago(1, hour=10, minute=30, second=0, microsecond=0)
  catchup=False
) as dag:

[2] cron形式

* cron形式(* * * * * で「分」「時」「日」「月」「曜日」)で指定
* 設定例:schedule_interval="0 1 * * *" (毎日01:00)
* 以下のサイトのように「月末に実行」だったり応用が利きそう

https://qiita.com/que9/items/b1daa478cc3333f29d15

[3] cron プリセット

* 以下の表のように”@[期間]”で設定可能
* 設定例:schedule_interval="@daily"
# preset meaning cron
1 @once 一度だけ実行 -
2 @hourly 1時間毎に実行 0 * * * *
3 @daily 1日一度だけ0:00に実行 0 0 * * *
4 @weekly 週に一度だけ日曜の0:00に実行 0 0 * * 0
5 @monthly 月に一度だけ各月の01日の0:00に実行 0 0 1 * *
6 @yearly 年に一度だけ01/01の0:00に実行 0 0 1 1 *

https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#cron-presets

[4] スケジューリングしたくない場合

* Noneを設定する
* 設定例:schedule_interval=None

2)使用上の注意

[1] start_date により、うまくいかなかったケースがあった

# MWAA(Airflow v2.0.2) においてうまくいかなかったケースがあったのでメモ
#  => Airflowのスケジュールを使用する際は、十分にテストした方がよさそう

# うまくいかなかった(なぜ?気のせい??)ケース
with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  schedule_interval="10 * * * *",
  start_date=datetime.now(), # datetime.now()
  catchup=False
) as dag:

# うまくいった
with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  schedule_interval="10 * * * *",
  start_date=days_ago(1),
  catchup=False
) as dag:

【4】catchup

* 前回実行時点まで遡って実行するかどうか
 => 前述したが、過去分の実行したくない場合、
  「catchup=False」で設定する

cf. catch up = 追いかける、追いつく

データ型 / デフォルト値

* bool
* Default : conf.getboolean('scheduler', 'catchup_by_default')

【5】timetable

* 実行開始タイミングが複雑で、
 独自のタイミングで実行したい場合に使用。
 => 以下、公式サイト「Customizing DAG Scheduling with Timetables」
  を見るとイメージ湧くかと、、、(必要に駆られたらブログにまとめるかも)

https://airflow.apache.org/docs/apache-airflow/2.2.0/howto/timetable.html

データ型 / デフォルト値

* Optional[Timetable]
* Default : None

【6】その他・注意事項

1)Airflow の pause 機能を使った際の挙動

* Airflow の pause 機能(一時停止状態。Web UIのDAGの右にあるトグルボタン)
 を使った際に、一時停止をして再度再開した場合、挙動

「catcup=False」の場合

* 一時停止した間に行うはずだった直近のDAGが即実行される
 => 以下のサイトの「catcup=False にまつわるやらかしで多いのが、
  pause 解除と同時にDAGが動き出す」も参照。

https://kencharos.hatenablog.com/entry/2020/05/07/161558

「catcup=True」の場合

* 一時停止した間に行うはずだったDAGが全て即実行される

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ DAGの引数 ~
https://dk521123.hatenablog.com/entry/2022/01/13/101634
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ CLI
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840