■ はじめに
Apache Airflow の タイムアウト について扱う。
目次
【1】DAGのタイムアウト関連のプロパティ 1)dagrun_timeout 2)dag_file_processor_timeout 3)dagbag_import_timeout 使用上の注意 使用イメージ 【2】タスクのタイムアウト関連のプロパティ 1)execution_timeout 使用イメージ 【3】サンプル 例1:dagrun_timeoutの試験コード 例2:execution_timeoutの試験コード
【1】DAGのタイムアウト関連のプロパティ
1)dagrun_timeout
より抜粋 ~~~~~~~~~~~~~~ dagrun_timeout (datetime.timedelta) -- specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created. The timeout is only enforced for scheduled DagRuns. -- 新しいDagRunインスタンス を生成するために DagRun がどの位で立ち上がり タイムアウト / 失敗にするかを指定する タイムアウトは、スケジュールされたDagRunsのためだけに強制される ~~~~~~~~~~~~~~
https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#timeouts
より抜粋 ~~~~~~~~~~~~~~ execution_timeout controls the maximum time allowed for every execution. If execution_timeout is breached, the task times out and AirflowTaskTimeout is raised. ~~~~~~~~~~~~~~ => タイムアウト時に例外「AirflowException」が発生する
2)dag_file_processor_timeout
* DAGファイルプロセス DagFileProcessor 時のタイムアウト
New in version 1.10.6. How long before timing out a DagFileProcessor, which processes a dag file
3)dagbag_import_timeout
* Pythonファイルインポートでのタイムアウト
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dagbag-import-timeout
How long before timing out a python file import
使用上の注意
* Airflow v1系は、Timeoutされない?(詳細は以下のサイト参照)
https://stackoverflow.com/questions/57110885/how-to-define-a-timeout-for-apache-airflow-dags
* なお、 v2系(v2.0.2 in AWS MWAA)で試したところうまく動いている
使用イメージ
* DAGインスタンスを生成(New)するときに指定する
イメージ
from datetime import timedelta dag = DAG( 'test_timeout', schedule_interval=None, default_args=args, # ★注目 dagrun_timeout=timedelta(seconds=20), )
【2】タスクのタイムアウト関連のプロパティ
* タスクのタイムアウトに関して、関連する設定は、以下の通り。 1)execution_timeout
1)execution_timeout
より抜粋 ~~~~~~~~~ execution_timeout (datetime.timedelta) – max time allowed for the execution of this task instance, if it goes beyond it will raise and fail. – このタスクインスタンスの実行するための最大時間。 もし超えた場合は例外が発生し失敗する ~~~~~~~~~ => タイムアウト時に例外「AirflowTaskTimeout」が発生する
使用イメージ
* 「デフォルトでの設定」と「タスク個別に設定」することが可能
イメージ
from datetime import timedelta # デフォルトでの設定例 default_args = { "provide_context": True, "execution_timeout": timedelta(seconds=10), } # オペレータ個別による設定例 task1 = PythonOperator( task_id='task1', python_callable=say_hello, execution_timeout=timedelta(seconds=3), dag=dag )
【3】サンプル
例1:dagrun_timeoutの試験コード
import os import time from datetime import timedelta from airflow import DAG from airflow.utils.dates import days_ago from airflow.operators.python import PythonOperator def say_hello(**context): print(f"こんにちは世界。10秒寝ます Zzz") # ★注目★ time.sleep(10) print(f"Done") defalut_args = { "start_date": days_ago(2), "provide_context": True } with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), description='This is a simple demo.', default_args=defalut_args, schedule_interval=None, # ★注目:期待としては、Job2の途中でタイムアウトする dagrun_timeout=timedelta(seconds=15), tags=['hello_world'], ) as dag: job1 = PythonOperator( task_id='say_hello_task1', dag=dag, python_callable=say_hello, ) job2 = PythonOperator( task_id='say_hello_task2', dag=dag, python_callable=say_hello, ) job3 = PythonOperator( task_id='say_hello_task3', dag=dag, python_callable=say_hello, ) job1 >> job2 >> job3
出力結果
ログより抜粋(タイムアウト時に例外「AirflowException」が発生する) [2021-10-14 17:16:46,001] {taskinstance.py:1265} ERROR - Received SIGTERM. Terminating subprocesses. [2021-10-14 17:16:46,011] {taskinstance.py:1482} ERROR - Task failed with exception Traceback (most recent call last): ...略... raise AirflowException("Task received SIGTERM signal")
例2:execution_timeoutの試験コード
import os import time from datetime import timedelta from airflow import DAG from airflow.utils.dates import days_ago from airflow.operators.python import PythonOperator def say_hello(**context): print(f"こんにちは世界。30秒寝ます Zzz") # ★注目★ time.sleep(30) print(f"Done") defalut_args = { "start_date": days_ago(2), "provide_context": True, # ★注目★ # Timeout時間が10秒で、上で30秒かかるのでタイムアウトするはず "execution_timeout": timedelta(seconds=10), } with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), description='This is a simple demo.', default_args=defalut_args, schedule_interval=None, tags=['hello_world'], ) as dag: job1 = PythonOperator( task_id='say_hello_task', dag=dag, python_callable=say_hello, # ★注目:下のコメントアウトを外すと、設定が上書きされてタイムアウトにならない★ # execution_timeout=timedelta(seconds=40) ) job1
出力結果
ログより抜粋(タイムアウト時に例外「AirflowTaskTimeout」が発生する) [2021-10-15 18:29:40,238] {timeout.py:42} ERROR - Process timed out, PID: 999 [2021-10-15 18:29:40,238] {dagbag.py:259} ERROR - Failed to import: /root/airflow/dags/hello.py Traceback (most recent call last): ...略... File "/root/airflow/lib/python3.6/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout raise AirflowTaskTimeout(self.error_message) airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 999
参考文献
https://qiita.com/munaita_/items/7fe474369a190d8d4ee6
関連記事
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703