【Airflow】Apache Airflow ~ リトライ ~

■ はじめに

Apache Airflow の リトライ について扱う。

目次

【1】手動でリトライ - 再実行
【2】airflow.cfgでの制御 - Airflow全体の設定
 1)default_task_retries
 2)max_db_retries
【3】タスクのリトライ関連のプロパティ
 1)retries
 2)retry_delay
 3)retry_exponential_backoff
 4)max_retry_delay
 5)on_retry_callback
 6)使用イメージ
【4】サンプル
 例1:実験コード

【1】手動でリトライ - 再実行

* Web UI や コマンド を通して、可能
 => 途中のタスクから実行など細かく再実行できる
 => 詳細は、以下のサイトを参照。

https://future-architect.github.io/articles/20200131/#2-%E3%83%AA%E3%83%88%E3%83%A9%E3%82%A4
https://blog.imind.jp/entry/2019/02/22/000049

【2】airflow.cfgでの制御 - Airflow全体の設定

1)default_task_retries
2)max_db_retries

1)default_task_retries

* デフォルトのタスクのリトライ回数

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#default-task-retries

New in version 1.10.6.

The number of retries each task is going to have by default.
Can be overridden at dag or task level.

2)max_db_retries

* v2.0.0から
* DB Operational Errorsの場合のリトライ回数

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-db-retries

New in version 2.0.0.

Number of times the code should be retried in case of DB Operational Errors.
Not all transactions will be retried as it can cause undesired state.
Currently it is only used in DagFileProcessor.process_file to retry dagbag.sync_to_db.

【3】タスクのリトライ関連のプロパティ

* タスクのリトライに関して、関連する設定は、以下の通り。

1)retries
2)retry_delay
3)retry_exponential_backoff
4)max_retry_delay
5)on_retry_callback

https://airflow.apache.org/docs/apache-airflow/1.10.3/_api/airflow/operators/index.html#package-contents

1)retries

* リトライ回数

API仕様より

retries (int)
 – the number of retries that should be performed
  before failing the task
 – タスクが失敗になる前に、実行されるリトライ回数

2)retry_delay

* リトライ時の遅延時間

API仕様より

retry_delay (datetime.timedelta)
 – delay between retries
 – リトライ間の遅延

3)retry_exponential_backoff

* 指数関数的後退アルゴリズム(※ 補足1参照)を使った
 リトライを行うかどうか

API仕様より

retry_exponential_backoff (bool)
 – allow progressive longer waits between retries
  by using exponential backoff algorithm on retry delay 
 (delay will be converted into seconds)
 – リトライ遅延での指数関数的後退アルゴリズムによる
 リトライ間隔の待ち時間を増加させるかどうか

※ 補足1:指数関数的後退アルゴリズム / exponential backoff algorithm

* 指数関数的後退アルゴリズム
 => リトライ回数が増えるごとに待ち時間を指数関数的に増やすアルゴリズム
 => 例:0.5秒、1秒、2秒、4秒、8秒, ... <= 固定じゃない
* 詳細は、以下のサイトを参照。

https://codezine.jp/article/detail/10739
https://note.com/artrigger_jp/n/n0795148b062d
https://yoshidashingo.hatenablog.com/entry/2014/08/17/135017

4)max_retry_delay

* リトライ間の最大遅延間隔(「2)retry_delay」とどう違う?)

API仕様より

max_retry_delay (datetime.timedelta)
 – maximum delay interval between retries
 – リトライ間の最大遅延間隔

5)on_retry_callback

* リトライ時のコールバック関数

API仕様より

on_retry_callback (callable)
 – much like the on_failure_callback
  except that it is executed when retries occur.
 – on_failure_callbackのような リトライ発生時のコールバック関数

* on_failure_callback については、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/06/141323

6)使用イメージ

* 「デフォルトでの設定」と「タスク個別に設定」することが可能

イメージ

from datetime import timedelta

# デフォルトでの設定例
default_args = {
  "provide_context": True,
  "retries": 1,
  "retry_exponential_backoff ": True,
  "retry_delay": timedelta(seconds=10)
}

# オペレータ個別による設定例
task1 = PythonOperator(
    task_id='task1',
    python_callable=say_hello,
    retries=3,
    retry_exponential_backoff=False,
    dag=dag
)

【4】サンプル

例1:実験コード

import os
import time
import datetime
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator


def say_hello(**context):
  print(f"こんにちは世界。10秒寝ます Zzz ... {datetime.datetime.now()}")
  time.sleep(10)
  print(f"Done ... {datetime.datetime.now()}")

def failed_on_purpose(**context):
  print(f"これからわざとエラーにします ... {datetime.datetime.now()}")
  raise Exception("Error...on purpose...")

def call_back_for_retry(**context):
  print(f"コールバック関数がコールされた ... {datetime.datetime.now()}")

defalut_args = {
  "start_date": days_ago(2),
  "provide_context": True,
  "retries": 5,
  "retry_exponential_backoff ": True,
  "retry_delay": timedelta(seconds=10),
  "max_retry_delay": timedelta(seconds=25),
  "on_retry_callback": call_back_for_retry
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  description='This is a simple demo.',
  default_args=defalut_args,
  schedule_interval=None,
  tags=['hello_world'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello_task1',
    dag=dag,
    python_callable=say_hello,
  )
  job2 = PythonOperator(
    task_id='say_hello_task2',
    dag=dag,
    # ★エラーを起こすメソッドを設定
    python_callable=failed_on_purpose,
    # retries=3,
    # retry_exponential_backoff=False
  )
  job1 >> job2

参考文献

https://qiita.com/munaita_/items/7fe474369a190d8d4ee6

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ タイムアウト
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703