【Airflow】Apache Airflow ~ タイムアウト ~

■ はじめに

Apache Airflow の タイムアウト について扱う。

目次

【1】DAGのタイムアウト関連のプロパティ
 1)dagrun_timeout
 2)dag_file_processor_timeout
 3)dagbag_import_timeout
 使用上の注意
 使用イメージ
【2】タスクのタイムアウト関連のプロパティ
 1)execution_timeout
 使用イメージ
【3】サンプル
 例1:dagrun_timeoutの試験コード
 例2:execution_timeoutの試験コード

【1】DAGのタイムアウト関連のプロパティ

1)dagrun_timeout

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/index.html#airflow.models.DAG

より抜粋
~~~~~~~~~~~~~~
dagrun_timeout (datetime.timedelta)
 -- specify how long a DagRun should be up before timing out / failing,
  so that new DagRuns can be created.
  The timeout is only enforced for scheduled DagRuns.
 -- 新しいDagRunインスタンス を生成するために
 DagRun がどの位で立ち上がり タイムアウト / 失敗にするかを指定する
 タイムアウトは、スケジュールされたDagRunsのためだけに強制される
~~~~~~~~~~~~~~

https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#timeouts

より抜粋
~~~~~~~~~~~~~~
execution_timeout controls the maximum time allowed for every execution.
If execution_timeout is breached, the task times out and AirflowTaskTimeout is raised.
~~~~~~~~~~~~~~
 => タイムアウト時に例外「AirflowException」が発生する

2)dag_file_processor_timeout

* DAGファイルプロセス DagFileProcessor 時のタイムアウト

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-file-processor-timeout

New in version 1.10.6.

How long before timing out a DagFileProcessor, which processes a dag file

3)dagbag_import_timeout

* Pythonファイルインポートでのタイムアウト

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dagbag-import-timeout

How long before timing out a python file import

使用上の注意

* Airflow v1系は、Timeoutされない?(詳細は以下のサイト参照)

https://stackoverflow.com/questions/57110885/how-to-define-a-timeout-for-apache-airflow-dags

* なお、 v2系(v2.0.2 in AWS MWAA)で試したところうまく動いている

使用イメージ

* DAGインスタンスを生成(New)するときに指定する

イメージ

from datetime import timedelta

dag = DAG(
  'test_timeout',
  schedule_interval=None,
  default_args=args,
  # ★注目
  dagrun_timeout=timedelta(seconds=20),
)

【2】タスクのタイムアウト関連のプロパティ

* タスクのタイムアウトに関して、関連する設定は、以下の通り。

1)execution_timeout

1)execution_timeout

https://airflow.apache.org/docs/apache-airflow/1.10.3/_api/airflow/operators/index.html#package-contents

より抜粋
~~~~~~~~~
execution_timeout (datetime.timedelta)
 – max time allowed for the execution of this task instance,
  if it goes beyond it will raise and fail.
 – このタスクインスタンスの実行するための最大時間。
 もし超えた場合は例外が発生し失敗する
~~~~~~~~~
 => タイムアウト時に例外「AirflowTaskTimeout」が発生する

使用イメージ

* 「デフォルトでの設定」と「タスク個別に設定」することが可能

イメージ

from datetime import timedelta

# デフォルトでの設定例
default_args = {
  "provide_context": True,
  "execution_timeout": timedelta(seconds=10),
}

# オペレータ個別による設定例
task1 = PythonOperator(
    task_id='task1',
    python_callable=say_hello,
    execution_timeout=timedelta(seconds=3),
    dag=dag
)

【3】サンプル

例1:dagrun_timeoutの試験コード

import os
import time
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator


def say_hello(**context):
  print(f"こんにちは世界。10秒寝ます Zzz")
  # ★注目★
  time.sleep(10)
  print(f"Done")

defalut_args = {
  "start_date": days_ago(2),
  "provide_context": True
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  description='This is a simple demo.',
  default_args=defalut_args,
  schedule_interval=None,
  # ★注目:期待としては、Job2の途中でタイムアウトする
  dagrun_timeout=timedelta(seconds=15),
  tags=['hello_world'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello_task1',
    dag=dag,
    python_callable=say_hello,
  )
  job2 = PythonOperator(
    task_id='say_hello_task2',
    dag=dag,
    python_callable=say_hello,
  )
  job3 = PythonOperator(
    task_id='say_hello_task3',
    dag=dag,
    python_callable=say_hello,
  )
  job1 >> job2 >> job3

出力結果

ログより抜粋(タイムアウト時に例外「AirflowException」が発生する)

[2021-10-14 17:16:46,001] {taskinstance.py:1265} ERROR - Received SIGTERM. Terminating subprocesses.
[2021-10-14 17:16:46,011] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
...略...
raise AirflowException("Task received SIGTERM signal")

例2:execution_timeoutの試験コード

import os
import time
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator


def say_hello(**context):
  print(f"こんにちは世界。30秒寝ます Zzz")
  # ★注目★
  time.sleep(30)
  print(f"Done")

defalut_args = {
  "start_date": days_ago(2),
  "provide_context": True,
  # ★注目★
  # Timeout時間が10秒で、上で30秒かかるのでタイムアウトするはず
  "execution_timeout": timedelta(seconds=10),
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  description='This is a simple demo.',
  default_args=defalut_args,
  schedule_interval=None,
  tags=['hello_world'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello_task',
    dag=dag,
    python_callable=say_hello,
    # ★注目:下のコメントアウトを外すと、設定が上書きされてタイムアウトにならない★
    # execution_timeout=timedelta(seconds=40)
  )
  job1

出力結果

ログより抜粋(タイムアウト時に例外「AirflowTaskTimeout」が発生する)

[2021-10-15 18:29:40,238] {timeout.py:42} ERROR - Process timed out, PID: 999
[2021-10-15 18:29:40,238] {dagbag.py:259} ERROR - Failed to import: /root/airflow/dags/hello.py
Traceback (most recent call last):
...略...
  File "/root/airflow/lib/python3.6/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 999

参考文献

https://qiita.com/munaita_/items/7fe474369a190d8d4ee6

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703