【Airflow】Apache Airflow ~ タイムアウト ~

■ はじめに

Apache Airflow の タイムアウト について扱う。

目次

【1】DAGのタイムアウト関連のプロパティ
 1)dagrun_timeout
 2)dag_file_processor_timeout
 3)dagbag_import_timeout
 4)使用イメージ
【2】タスクのタイムアウト関連のプロパティ
 1)execution_timeout
 2)timeout
 3)使用イメージ
 4)使用上の注意
【3】サンプル
 例1:dagrun_timeoutの試験コード
 例2:execution_timeoutの試験コード
【4】使用上の注意
 1)Airflow v1系は、Timeoutされない可能性あり
 2)タイムアウトエラーの場合、リトライはされない

【1】DAGのタイムアウト関連のプロパティ

1)dagrun_timeout

* DAG の実行のタイムアウトを定義

https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/models/dag.html#DAG

より抜粋
~~~~~~~~~~~~~~
dagrun_timeout (datetime.timedelta)
 -- specify how long a DagRun should be up before timing out / failing,
  so that new DagRuns can be created.
  The timeout is only enforced for scheduled DagRuns.
 -- 新しいDagRunインスタンス を生成するために
 DagRun がどの位で立ち上がり タイムアウト / 失敗にするかを指定する
 タイムアウトは、スケジュールされたDagRunsのためだけに強制される
~~~~~~~~~~~~~~

https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#timeouts

より抜粋
~~~~~~~~~~~~~~
execution_timeout controls the maximum time allowed for every execution.
If execution_timeout is breached, the task times out
 and AirflowTaskTimeout is raised.
~~~~~~~~~~~~~~
 => タイムアウト時に例外「AirflowException」が発生する

2)dag_file_processor_timeout

* DAGファイルプロセス DagFileProcessor 時のタイムアウト

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-file-processor-timeout

New in version 1.10.6.

How long before timing out a DagFileProcessor, which processes a dag file

3)dagbag_import_timeout

* Pythonファイルインポートでのタイムアウト

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dagbag-import-timeout

How long before timing out a python file import

4)使用イメージ

* DAGインスタンスを生成(New)するときに指定する

イメージ

from datetime import timedelta

dag = DAG(
  'test_timeout',
  schedule_interval=None,
  default_args=args,
  # ★注目
  dagrun_timeout=timedelta(seconds=20),
)

【2】タスクのタイムアウト関連のプロパティ

* タスクのタイムアウトに関して、関連する設定は、以下の通り。

1)execution_timeout
2)timeout

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#timeouts

1)execution_timeout

* タスク実行のタイムアウト値

公式ドキュメント
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#timeouts

より抜粋
~~~~~~~~~
If you want a task to have a maximum runtime,
 set its execution_timeout attribute to a datetime.timedelta value
 that is the maximum permissible runtime. 
This applies to all Airflow tasks, including sensors.
execution_timeout controls the maximum time allowed for every execution.
If execution_timeout is breached, the task times out
 and AirflowTaskTimeout is raised.

タスクに最大実行時間を持たせたい場合は
execution_timeout 属性に datetime.timedelta 値を設定し、
許容される最大実行時間を設定します。
これは、センサーを含むすべてのAirflowタスクに適用されます。
execution_timeoutは、すべての実行に許容される最大時間を制御します。
execution_timeoutに違反した場合、タスクはタイムアウトし、
AirflowTaskTimeoutが発生します。
~~~~~~~~~

2)timeout

* reschedule モード時のセンサのタイムアウト値
 => reschedule モードやセンサについては、以下の関連記事を参照のこと

Apache Airflow ~ Sensor ~
https://dk521123.hatenablog.com/entry/2023/10/30/002751

より抜粋
~~~~~~~~~~~
2)rescheduleモード

* センサーはチェックするときだけワーカースロットを占有し、
 その間は、Airflowのスケジュールの仕組みで待つので、
 待っている間は、他のタスクに割り当てて実行することが可能
~~~~~~~~~~~

公式ドキュメント
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#timeouts

より抜粋
~~~~~~~~~
In addition, sensors have a timeout parameter.
This only matters for sensors in reschedule mode.
timeout controls the maximum time allowed for the sensor to succeed.
If timeout is breached, AirflowSensorTimeout will be raised
 and the sensor fails immediately without retrying.

さらに、センサーは timeout パラメーターを持つ。
これはreschedule モードのセンサーにのみ関係します。
timeoutはセンサーが成功するまでの最大時間を制御する。
timeoutに違反した場合、AirflowSensorTimeoutが発生し、
センサーは再試行せずに即座に失敗する。
~~~~~~~~~

3)使用イメージ

* 「デフォルトでの設定」と「タスク個別に設定」することが可能

イメージ

from datetime import timedelta

# デフォルトでの設定例
default_args = {
  "provide_context": True,
  "execution_timeout": timedelta(seconds=10),
}

# オペレータ個別による設定例
task1 = PythonOperator(
    task_id='task1',
    python_callable=say_hello,
    execution_timeout=timedelta(seconds=3),
    dag=dag
)

# センサ個別による設定例
sensor = SFTPSensor(
    task_id="sensor",
    path="/root/test",
    execution_timeout=timedelta(seconds=60),
    timeout=3600,
    retries=2,
    mode="reschedule",
)

4)使用上の注意

* 以下のサイトが凄くまとまっているので一読した方がいいかも。
 => 以下のサイトの通り、
 『汎用的な BaseOperator.execution_timeout を使うことがおすすめ』

https://qiita.com/munaita_/items/7fe474369a190d8d4ee6

[1] Sensor で execution_timeout だけでは賄えないケースがある

* Sensor に関しては、両方つけておいた方がいいかと、、、

[2] Sensor で execution_timeout / timeout を同時に使った場合は短い方が優先

# 以下の場合、、、
#  => 結論としては、Sensorについては、両方 execution_timeout / timeout
#   同じ値を入れておいた方がいいかも、、、
sensor = SFTPSensor(
    task_id="sensor",
    path="/root/test",
    execution_timeout=timedelta(seconds=60), # ★短い60秒が優先される
    timeout=3600, # ★
    retries=2,
    mode="reschedule",
)

【3】サンプル

例1:dagrun_timeoutの試験コード

import os
import time
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator


def say_hello(**context):
  print(f"こんにちは世界。10秒寝ます Zzz")
  # ★注目★
  time.sleep(10)
  print(f"Done")

defalut_args = {
  "start_date": days_ago(2),
  "provide_context": True
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  description='This is a simple demo.',
  default_args=defalut_args,
  schedule_interval=None,
  # ★注目:期待としては、Job2の途中でタイムアウトする
  dagrun_timeout=timedelta(seconds=15),
  tags=['hello_world'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello_task1',
    dag=dag,
    python_callable=say_hello,
  )
  job2 = PythonOperator(
    task_id='say_hello_task2',
    dag=dag,
    python_callable=say_hello,
  )
  job3 = PythonOperator(
    task_id='say_hello_task3',
    dag=dag,
    python_callable=say_hello,
  )
  job1 >> job2 >> job3

出力結果

ログより抜粋(タイムアウト時に例外「AirflowException」が発生する)

[2021-10-14 17:16:46,001] {taskinstance.py:1265} ERROR - Received SIGTERM. Terminating subprocesses.
[2021-10-14 17:16:46,011] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
...略...
raise AirflowException("Task received SIGTERM signal")

例2:execution_timeoutの試験コード

import os
import time
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator


def say_hello(**context):
  print(f"こんにちは世界。30秒寝ます Zzz")
  # ★注目★
  time.sleep(30)
  print(f"Done")

defalut_args = {
  "start_date": days_ago(2),
  "provide_context": True,
  # ★注目★
  # Timeout時間が10秒で、上で30秒かかるのでタイムアウトするはず
  "execution_timeout": timedelta(seconds=10),
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  description='This is a simple demo.',
  default_args=defalut_args,
  schedule_interval=None,
  tags=['hello_world'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello_task',
    dag=dag,
    python_callable=say_hello,
    # ★注目:下のコメントアウトを外すと、設定が上書きされてタイムアウトにならない★
    # execution_timeout=timedelta(seconds=40)
  )
  job1

出力結果

ログより抜粋(タイムアウト時に例外「AirflowTaskTimeout」が発生する)

[2021-10-15 18:29:40,238] {timeout.py:42} ERROR - Process timed out, PID: 999
[2021-10-15 18:29:40,238] {dagbag.py:259} ERROR - Failed to import: /root/airflow/dags/hello.py
Traceback (most recent call last):
...略...
  File "/root/airflow/lib/python3.6/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 999

【4】使用上の注意

1)Airflow v1系は、Timeoutされない可能性あり

* Airflow v1系は、Timeoutされない?(詳細は以下のサイト参照)

https://stackoverflow.com/questions/57110885/how-to-define-a-timeout-for-apache-airflow-dags

* なお、 v2系(v2.0.2 in AWS MWAA)で試したところうまく動いている

2)タイムアウトエラーの場合、リトライはされない

MWAA-Local v2.6 で確認したところ、
Job内でタイムエラーになった場合、リトライは行われなかった
 => リトライに関しては、以下の関連記事を参照のこと

Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000

参考文献

https://qiita.com/munaita_/items/7fe474369a190d8d4ee6

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ Sensor ~
https://dk521123.hatenablog.com/entry/2023/10/30/002751
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703