■ はじめに
https://dk521123.hatenablog.com/entry/2022/01/13/101634
の「実行タイミングに関わる引数」で、 「start_date」「end_date」「schedule_interval」「catchup」「timetable」 について扱ったが、掘り下げてみる。
目次
【0】Airflowの実行タイミングについて 1)サンプル 【1】start_date 1)使用上の注意 【2】end_date 【3】schedule_interval 1)記述方法 2)使用上の注意 【4】catchup 【5】timetable 【6】その他・注意事項 1)Airflow の pause 機能を使った際の挙動
【0】Airflowの実行タイミングについて
* 基本的には、「start_date から end_date まで、 schedule_interval の間隔で実行」 => start_date + schedule_interval で実行 * 以下のサイトの説明が分かりやすい。
Apache Airflowでエンドユーザーのための機械学習パイプラインを構築する Part4 | by piqcy | programming-soda | Medium
1)サンプル
import os from datetime import datetime from datetime import timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dummy import DummyOperator def say_hello(**context): current_datetime = datetime.now() print(f"Hello world. now = {current_datetime}") with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval=timedelta(days=1), start_date=datetime.now(), catchup=False ) as dag: task1 = DummyOperator(task_id="task1") task2 = PythonOperator( task_id='task2', dag=dag, python_callable=say_hello, ) task1 >> task2
【1】start_date
* ワークフローの開始日付 => スケジューラーが遡って実行を試みるタイムスタンプ
データ型 / デフォルト値
* Optional[datetime] << ★「1)使用上の注意」の[1]も参照 * Default : None
1)使用上の注意
[1] start_date は 設定必須
* 前述で「Optional[datetime]」とあるが、 あくまでDAG引数としてだけで 少なくとも MWAA (Apache Airflow v2.0.2) では、 start_date を DAGのコンストラクタ引数 又は 各Operatorに設定しておかないと 例外「AirflowException: Task is missing the start_date parameter」になる => 基本的には、DAGのコンストラクタ引数 start_date を 設定するようにした方がよさそう
[2] 過去日付を設定した場合、遡って実行
* start_date に対して、過去日付を入れると遡って実行される => それを防ぐために、 「catchup=False」(後述「【4】catchup」参照) を設定する
[3] 実行すると変更できない
* 詳細は、以下のサイト参照。
【2】end_date
* ワークフローの終了日付 => スケジューラーが実行する最終日付のタイムスタンプ => 「A date beyond which your DAG won’t run」とあるので 終了日付まで実行し、その日付を超えた場合、実行されない
データ型 / デフォルト値
* Optional[datetime] * Default : None
【3】schedule_interval
* ワークフローの実行間隔 => 設定方法は、後述「1)記述方法」を参照のこと。
データ型 / デフォルト値
* ScheduleIntervalArg => datetime.timedelta or dateutil.relativedelta.relativedelta * Default : ScheduleIntervalArgNotSet
1)記述方法
[1] datetime.timedeltaオブジェクト
* datetime.timedelta を使って指定する
https://docs.python.org/ja/3/library/datetime.html#timedelta-objects
* 設定例:schedule_interval=timedelta(days=1) * 時間を設定したい場合は、start_dateを以下のようにすると実行される
例:datetime.timedelta 指定で時間を設定したい場合
# UTC時間「10:30:00.000」で、一日ごとに開始する場合 # days_ago のソースを参照(https://airflow.apache.org/docs/apache-airflow/1.10.3/_modules/airflow/utils/dates.html) with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval=timedelta(days=1), start_date=days_ago(1, hour=10, minute=30, second=0, microsecond=0) catchup=False ) as dag:
[2] cron形式
* cron形式(* * * * * で「分」「時」「日」「月」「曜日」)で指定 * 設定例:schedule_interval="0 1 * * *" (毎日01:00) * 以下のサイトのように「月末に実行」だったり応用が利きそう
https://qiita.com/que9/items/b1daa478cc3333f29d15
[3] cron プリセット
* 以下の表のように”@[期間]”で設定可能 * 設定例:schedule_interval="@daily"
# | preset | meaning | cron |
---|---|---|---|
1 | @once | 一度だけ実行 | - |
2 | @hourly | 1時間毎に実行 | 0 * * * * |
3 | @daily | 1日一度だけ0:00に実行 | 0 0 * * * |
4 | @weekly | 週に一度だけ日曜の0:00に実行 | 0 0 * * 0 |
5 | @monthly | 月に一度だけ各月の01日の0:00に実行 | 0 0 1 * * |
6 | @yearly | 年に一度だけ01/01の0:00に実行 | 0 0 1 1 * |
https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#cron-presets
[4] スケジューリングしたくない場合
* Noneを設定する * 設定例:schedule_interval=None
2)使用上の注意
[1] start_date により、うまくいかなかったケースがあった
# MWAA(Airflow v2.0.2) においてうまくいかなかったケースがあったのでメモ # => Airflowのスケジュールを使用する際は、十分にテストした方がよさそう # うまくいかなかった(なぜ?気のせい??)ケース with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval="10 * * * *", start_date=datetime.now(), # datetime.now() catchup=False ) as dag: # うまくいった with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval="10 * * * *", start_date=days_ago(1), catchup=False ) as dag:
【4】catchup
* 前回実行時点まで遡って実行するかどうか => 前述したが、過去分の実行したくない場合、 「catchup=False」で設定する cf. catch up = 追いかける、追いつく
データ型 / デフォルト値
* bool * Default : conf.getboolean('scheduler', 'catchup_by_default')
【5】timetable
* 実行開始タイミングが複雑で、 独自のタイミングで実行したい場合に使用。 => 以下、公式サイト「Customizing DAG Scheduling with Timetables」 を見るとイメージ湧くかと、、、(必要に駆られたらブログにまとめるかも)
https://airflow.apache.org/docs/apache-airflow/2.2.0/howto/timetable.html
データ型 / デフォルト値
* Optional[Timetable] * Default : None
【6】その他・注意事項
1)Airflow の pause 機能を使った際の挙動
* Airflow の pause 機能(一時停止状態。Web UIのDAGの右にあるトグルボタン) を使った際に、一時停止をして再度再開した場合、挙動
「catcup=False」の場合
* 一時停止した間に行うはずだった直近のDAGが即実行される => 以下のサイトの「catcup=False にまつわるやらかしで多いのが、 pause 解除と同時にDAGが動き出す」も参照。
https://kencharos.hatenablog.com/entry/2020/05/07/161558
「catcup=True」の場合
* 一時停止した間に行うはずだったDAGが全て即実行される
関連記事
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ DAGの引数 ~
https://dk521123.hatenablog.com/entry/2022/01/13/101634
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト ~
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ CLI ~
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840