【Airflow】Apache Airflow ~ Backfill ~

■ はじめに

久しぶりに、Airflow について扱っており、
リカバリのために「Backfill」ってものを学んだので
メモしておく

目次

【1】予備知識:DAG
 1)start_date
 2)end_date
 3)schedule_interval
【2】Backfill
 1)backfill コマンド
 2)Hello World for Backfill
【3】使用上の注意
【4】Hello World for Backfill
 1)Airflow環境
 2)DAG作成
 3)backfill コマンドの実行
 4)動作確認

【1】予備知識:DAG

* 詳細は、以下の関連記事を参照のこと

Apache Airflow ~ 実行タイミング ~
https://dk521123.hatenablog.com/entry/2022/01/15/014005

1)start_date

* DAG の実行を開始する日時

2)end_date

* DAG の実行を停止する日時

3)schedule_interval

* 実行間隔

【2】Backfill

* 過去の特定の期間について、DAG を改めて実行する

cf. Backfill = 埋め戻し

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dag-run.html#backfill

https://airflow.apache.org/docs/apache-airflow/2.0.2/cli-and-env-variables-ref.html#backfill

より抜粋
~~~~~~~~~~
Run subsections of a DAG for a specified date range.
指定された日付範囲におけるサブセクションのDAGを実行する

If reset_dag_run option is used,
 backfill will first prompt users whether airflow should clear all the previous dag_run and task_instances
 within the backfill date range.
もし、reset_dag_run オプションを使った場合
backfill は、そのbackfill が指定された日付範囲内で、
airflow が全ての前回のdag_run とtask_instancesをクリアするかどうかを
ユーザに促します

If rerun_failed_tasks is used,
 backfill will auto re-run the previous failed task instances within the backfill date range
もし、rerun_failed_tasks オプションを使った場合
backfill は、そのbackfill が指定された日付範囲内で、
自動的に前回エラーになったタスクインスタンスを再実行します。
~~~~~~~~~~

1)backfill コマンド

https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#backfill

構文

airflow dags backfill [-h] [-c CONF] [--continue-on-failures]
                      [--delay-on-limit DELAY_ON_LIMIT] [--disable-retry] [-x]
                      [-n] [-e END_DATE] [-i] [-I] [-l] [-m] [--pool POOL]
                      [--rerun-failed-tasks] [--reset-dagruns] [-B]
                      [-s START_DATE] [-S SUBDIR] [-t TASK_REGEX]
                      [--treat-dag-as-regex] [--treat-dag-id-as-regex] [-v]
                      [-y]
                      dag_id
Options Explanations Defaults
-s, --start-date [YYYY-MM-DD] 開始日付 (Required) -
-e, --end-date [YYYY-MM-DD] 終了日付 (Required) -
--continue-on-failures 失敗時に続けるかどうか False
-n, --dry-run 失敗時に続けるかどうか False
-t, --task-regex [TASK_REGEX] 実行したい特定のTask IDの正規表現で指定する (optional) None

コマンド例

# airflow dags backfill \
#    --start-date <START_DATE (YYYY-MM-DD)> \
#    --end-date <END_DATE (YYYY-MM-DD)> \
#    <dag_id>
airflow dags backfill \
    --start-date 2024-10-02 \
    --end-date 2024-10-05 \
    demo_dag

# --continue-on-failures: 失敗時に続けるかどうか (False)
# -n, --dry-run: 失敗時に続けるかどうか (False)

https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#backfill

【3】使用上の注意

* 日時は、UTC時間(JST/日本時間じゃない)

【4】Hello World for Backfill

1)Airflow環境

* 以下の関連記事を参照のこと

Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840

2)DAG作成

* どんなDAGでもいいが、以下を参照

airflow-docker/dags/sample_dag.py

import airflow
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

import pendulum

default_args = {
  "owner": "airflow"
}
@dag(
  'demo_dag',
  description="A simple DAG",
  default_args=default_args,
  schedule_interval="@daily",
  start_date=pendulum.datetime(2024, 9, 15, tz="UTC"),
  catchup=False,
  # https://future-architect.github.io/articles/20200131/
  concurrency=2, # task は 2つまで同時起動できる
  max_active_runs=1, # DAG は 1つまで同時起動できる
  tags=['example']
)
def main_dag():
  @task
  def get_hello_world():
     return 'hello world'

  @task
  def say_hello(result):
    print(result)

  result = get_hello_world()
  say_hello(result)

main_dag = main_dag()

3)backfill コマンドの実行

# docker container id を調べる
$ docker ps
RTS                                         NAMES
f4f79f431872   apache/airflow:2.10.2   "/usr/bin/dumb-init …"   51 minutes ago   Up 47 minutes (healthy)     0.0.0.0:28080->8080/tcp, :::28080->8080/tcp   airflow-airflow-webserver-1

# docker container 内部に入る
$ docker exec -it f4f79f431872  bash

# backfill コマンドを実行する
$ airflow dags backfill \
    --start-date 2024-10-02 \
    --end-date 2024-10-05 \
    demo_dag

4)動作確認

* ブラウザでAirflow Web UIにアクセスし、
 DAG ID「demo_dag」の詳細を確認する

http://localhost:8080/

参考文献

https://qiita.com/pokoyakazan/items/822386429c2f05c0953d
https://qiita.com/_______________-/items/a590adbeb0f89232e4f0

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ CLI
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ Catchup ~
https://dk521123.hatenablog.com/entry/2024/10/06/205302
Apache Airflow ~ 実行タイミング ~
https://dk521123.hatenablog.com/entry/2022/01/15/014005
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020