■ はじめに
https://dk521123.hatenablog.com/entry/2021/10/10/000000
https://dk521123.hatenablog.com/entry/2021/10/12/000000
の続き。 今回は、同実行数などの並列・マルチスレッド関連の設定について扱う
目次
【1】airflow.cfgでの制御 - Airflow全体の設定 1)parallelism 2)max_active_tasks_per_dag / dag_concurrency 3)max_active_runs_per_dag 4)parsing_processes / max_threads 5)default_pool_task_slot_count / non_pooled_task_slot_count 6)executor 7)task_runner 【2】DAGでの制御 1)max_active_tasks / concurrency 2)max_active_runs 3)実験コード
【1】airflow.cfgでの制御 - Airflow全体の設定
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#core
に記載されている。 このページ内で、「parallel」「thread」「concurr(ency)」などの キーワードで検索して調べてみた。
1)parallelism
* Airflow 全体の Task instance の並列実行数
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#parallelism
This defines the maximum number of task instances that can run concurrently in Airflow regardless of scheduler count and worker count. スケジューラ数やワーカー数に関わらず、 Airflow内の同時実行するタスクインスタンスの最大数を定義する Generally, this value is reflective of the number of task instances with the running state in the metadata database. 一般的に、この値は、メタデータのデータベース内の ステータス「実行中」のタスクインスタンスの数が反映される cf reflective : 反映
2)max_active_tasks_per_dag / dag_concurrency
* 指定したDAGごとの同時実行のタスク最大数 * 「【2】DAGでの制御」の「1)max_active_tasks / concurrency」も参照
a) max_active_tasks_per_dag
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-active-tasks-per-dag
New in version 2.2.0. The maximum number of task instances allowed to run concurrently in each DAG. このタスクインスタンスの最大数は、各DAGでの実行を許可する To calculate the number of tasks that is running concurrently for a DAG, add up the number of running tasks for all DAG runs of the DAG. DAGでのタスクの同時実行数を計算するために、 DAGの全てのDAG 実行のために実行タスク数を足し合わせてください。 This is configurable at the DAG level with max_active_tasks, which is defaulted as max_active_tasks_per_dag. これは、デフォルト値がmax_active_tasks_per_dagとして、 max_active_tasksを使ってDAGレベルで設定できる (「【2】DAGでの制御」の「1)max_active_tasks / concurrency」を参照) An example scenario when this would be useful is when you want to stop a new dag with an early start date from stealing all the executor slots in a cluster. この有効なシナリオ例として、 開始日時が早すぎてしまい、クラスタ内の全ての実行スロットから 新しいDAGを停止したい時である
b) dag_concurrency
* version 2.2.0より前のバージョンで適用(以降は非推奨) => MWAA だと、v2.0.2なので、こちらを使うことになるが、、、
Deprecated since version 2.2.0 : The option has been moved to core.max_active_tasks_per_dag
3)max_active_runs_per_dag
* DAG ごとの DAG同時実行最大数 * 「【2】DAGでの制御」の「2)max_active_runs」も参照
The maximum number of active DAG runs per DAG. DAGごとのアクティブなDAGの最大実行数 The scheduler will not create more DAG runs if it reaches the limit. もしその限度に行き着いた場合、 そのスケジューラはDAG実行をこれ以上生成しない。 This is configurable at the DAG level with max_active_runs, which is defaulted as max_active_runs_per_dag. これは、デフォルト値がmax_active_runs_per_dagとして、 max_active_runsを使ってDAGレベルで設定できる (「【2】DAGでの制御」の「2)max_active_runs」を参照)
4)parsing_processes / max_threads
* SchedulerノードのSchedulerプロセス数 * DAG解析/タスク生成/タスクのスケジューリングを行うプロセス
a) parsing_processes
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#config-scheduler-parsing-processes
New in version 1.10.14. The scheduler can run multiple processes in parallel to parse dags. スケジューラは、DAGをパースするために並列のマルチプロセスで実行できる This defines how many processes will run. これは、どの位のプロセスが実行できるかを定義する
b) max_threads
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-threads-deprecated
Deprecated since version 1.10.14: The option has been moved to scheduler.parsing_processes
5)default_pool_task_slot_count / non_pooled_task_slot_count
* default_poolのタスクスロット数
a) default_pool_task_slot_count
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#default-pool-task-slot-count
New in version 2.2.0. Task Slot counts for default_pool. default_poolのタスクスロット数 This setting would not have any effect in an existing deployment where the default_pool is already created. この設定は、default_poolが既に作成されている存在する開発環境内において 何も影響がないでしょう。 For existing deployments, users can change the number of slots using Webserver, API or the CLI 存在する環境において、ユーザは、Webserver, API, CLIを使って スロット数を変えることができる。
b) non_pooled_task_slot_count
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#non-pooled-task-slot-count-deprecated
non_pooled_task_slot_count (Deprecated) Deprecated since version 1.10.4: v1.10.4から非推奨 The option has been moved to core.default_pool_task_slot_count
6)executor
* Airflowで使用する実行(executor)クラス(※1)
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#executor
The executor class that airflow should use. Airflowで使用されているexecutorクラス。 Choices include SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor, CeleryKubernetesExecutor or the full import path to the class when using a custom executor.
※1:実行(executor)クラス
* 以下の公式サイトを参照
https://airflow.apache.org/docs/apache-airflow/stable/executor/index.html
* 以下の一般サイトも参照。
https://dev.classmethod.jp/articles/apache-airflow-parallelism-and-concurrency/
7)task_runner
* サブプロセス内でのタスクインスタンス実行で使用するクラス (StandardTaskRunner/CgroupTaskRunner)
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-runner
The class to use for running task instances in a subprocess. サブプロセス内でのタスクインスタンス実行で使用するクラス Choices include StandardTaskRunner, CgroupTaskRunner or the full import path to the class when using a custom task runner. StandardTaskRunner, CgroupTaskRunner又は 独自Task runnerを使ったクラスのフルインポートパスを選択してください
StandardTaskRunner
https://github.com/apache/airflow/blob/main/airflow/task/task_runner/standard_task_runner.py
CgroupTaskRunner
https://github.com/apache/airflow/blob/main/airflow/task/task_runner/cgroup_task_runner.py
【2】DAGでの制御
に記載されている。
1)max_active_tasks / concurrency
* 指定したDAGのおける同時実行のタスク最大数 * 「【1】airflow.cfgでの制御 - Airflow全体の設定」の 「2)max_active_tasks_per_dag / dag_concurrency」も参照 * MWAA(v2.0.2)の場合、「concurrency」を使用する
a) max_active_tasks
max_active_tasks (int) -- the number of task instances allowed to run concurrently -- 同時実行を許容するタスクインスタンスの数
b) concurrency
* 既に非推奨 => Airflow v3.0 で削除されそう (「TODO: Remove in Airflow 3.0」ってあるし)
@property def concurrency(self) -> int: # TODO: Remove in Airflow 3.0 warnings.warn( "The 'DAG.concurrency' attribute is deprecated. Please use 'DAG.max_active_tasks'.", DeprecationWarning, stacklevel=2, ) return self._max_active_tasks
2)max_active_runs
* 指定したDAGに対して、同時に実行できるDAGインスタンス数 * 「【1】airflow.cfgでの制御 - Airflow全体の設定」の 「3)max_active_runs_per_dag」も参照 max_active_runs (int) -- maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won't create new active DAG runs -- アクティブなDAG実行インスタンスの最大数で ステータス「実行中」で、このDAG実行インスタンスの数を超えた場合 スケジューラは新しいアクティブなDAG実行インスタンスを生成しない
補足:「max_active_runs = 1」の動作について
https://github.com/apache/airflow/issues/9975
~~~~ max_active_runs = 1 can still create multiple active execution runs I have max_active_runs = 1 in my dag file (which consists of multiple tasks) and I manually triggered a dag. While it was running, a second execution began under its scheduled time while the first execution was running. ~~~~ 同様なことが、MWAA(v2.0.2)でも同じ結果で、 max_active_runs = 1を設定したにも関わらず、 実行中でも、手動で実行でき、複数起動できてしまった
https://github.com/apache/airflow/issues/9975#issuecomment-927015597
~~~~ Issue does not happen on 2.1.4 ~~~~ 確かに、MWAA(v2.0.2)の環境で、 以下の実験コードのように「max_active_runs = 1」を指定したのだが 複数で実行できてしまった。。。(v2.1.4 だと修正されている?)
3)実験コード
import os import time import datetime from datetime import timedelta from airflow import DAG from airflow.utils.dates import days_ago from airflow.operators.python import PythonOperator def say_hello(**context): print(f"こんにちは世界。10秒寝ます Zzz ... {datetime.datetime.now()}") time.sleep(10) print(f"Done ... {datetime.datetime.now()}") defalut_args = { "start_date": days_ago(2), "provide_context": True, } with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), description='This is a simple demo.', default_args=defalut_args, schedule_interval=None, # ★注目★ # 例えば、concurrency=2を指定した場合、 # job1_1, job1_2 は同時実行できるが、job1_3は待たされる concurrency=2, #max_active_tasks=2, max_active_runs=1, tags=['hello_world'], ) as dag: job1_1 = PythonOperator( task_id='say_hello_task1_1', dag=dag, python_callable=say_hello, ) job1_2 = PythonOperator( task_id='say_hello_task1_2', dag=dag, python_callable=say_hello, ) job1_3 = PythonOperator( task_id='say_hello_task1_3', dag=dag, python_callable=say_hello, ) job2 = PythonOperator( task_id='say_hello_task2', dag=dag, python_callable=say_hello, ) [job1_1, job1_2, job1_3] >> job2
参考文献
https://scrapbox.io/ohbarye/Airflow%E3%81%AEDAG%E3%82%84task%E3%81%AE%E5%90%8C%E6%99%82%E5%AE%9F%E8%A1%8C%E6%95%B0
https://dev.classmethod.jp/articles/apache-airflow-parallelism-and-concurrency/
https://qiita.com/tmy310/items/9d657445b768f797c990
関連記事
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト ~
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000
MWAA ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/09/29/131101