【Airflow】Apache Airflow ~ 同時実行 / 並列関連 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/10/10/000000
https://dk521123.hatenablog.com/entry/2021/10/12/000000

の続き。

今回は、同実行数などの並列・マルチスレッド関連の設定について扱う

目次

【1】airflow.cfgでの制御 - Airflow全体の設定
 1)parallelism
 2)max_active_tasks_per_dag / dag_concurrency
 3)max_active_runs_per_dag
 4)parsing_processes / max_threads
 5)default_pool_task_slot_count / non_pooled_task_slot_count
 6)executor
 7)task_runner
【2】DAGでの制御
 1)max_active_tasks / concurrency
 2)max_active_runs
 3)実験コード

【1】airflow.cfgでの制御 - Airflow全体の設定

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#core

に記載されている。

このページ内で、「parallel」「thread」「concurr(ency)」などの
キーワードで検索して調べてみた。

1)parallelism

* Airflow 全体の Task instance の並列実行数

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#parallelism

This defines the maximum number of task instances
 that can run concurrently in Airflow
 regardless of scheduler count and worker count.
スケジューラ数やワーカー数に関わらず、
Airflow内の同時実行するタスクインスタンスの最大数を定義する

Generally, this value is reflective of the number of task instances
 with the running state in the metadata database.
一般的に、この値は、メタデータのデータベース内の
ステータス「実行中」のタスクインスタンスの数が反映される

cf reflective : 反映

2)max_active_tasks_per_dag / dag_concurrency

* 指定したDAGごとの同時実行のタスク最大数
* 「【2】DAGでの制御」の「1)max_active_tasks / concurrency」も参照

a) max_active_tasks_per_dag
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-active-tasks-per-dag

New in version 2.2.0.

The maximum number of task instances allowed
 to run concurrently in each DAG.
このタスクインスタンスの最大数は、各DAGでの実行を許可する

To calculate the number of tasks
 that is running concurrently for a DAG,
 add up the number of running tasks for all DAG runs of the DAG.
DAGでのタスクの同時実行数を計算するために、
DAGの全てのDAG 実行のために実行タスク数を足し合わせてください。

This is configurable at the DAG level with max_active_tasks,
 which is defaulted as max_active_tasks_per_dag.
これは、デフォルト値がmax_active_tasks_per_dagとして、
max_active_tasksを使ってDAGレベルで設定できる
(「【2】DAGでの制御」の「1)max_active_tasks / concurrency」を参照)

An example scenario when this would be useful is
 when you want to stop a new dag with an early start date
 from stealing all the executor slots in a cluster.
この有効なシナリオ例として、
開始日時が早すぎてしまい、クラスタ内の全ての実行スロットから
新しいDAGを停止したい時である

b) dag_concurrency

* version 2.2.0より前のバージョンで適用(以降は非推奨)
 => MWAA だと、v2.0.2なので、こちらを使うことになるが、、、

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-concurrency-deprecated

Deprecated since version 2.2.0
: The option has been moved to core.max_active_tasks_per_dag

3)max_active_runs_per_dag

* DAG ごとの DAG同時実行最大数
* 「【2】DAGでの制御」の「2)max_active_runs」も参照

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-active-runs-per-dag

The maximum number of active DAG runs per DAG.
DAGごとのアクティブなDAGの最大実行数

The scheduler will not create more DAG runs if it reaches the limit.
もしその限度に行き着いた場合、
そのスケジューラはDAG実行をこれ以上生成しない。

This is configurable at the DAG level with max_active_runs,
 which is defaulted as max_active_runs_per_dag.
これは、デフォルト値がmax_active_runs_per_dagとして、
max_active_runsを使ってDAGレベルで設定できる
(「【2】DAGでの制御」の「2)max_active_runs」を参照)

4)parsing_processes / max_threads

* SchedulerノードのSchedulerプロセス数
* DAG解析/タスク生成/タスクのスケジューリングを行うプロセス

a) parsing_processes
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#config-scheduler-parsing-processes

New in version 1.10.14.

The scheduler can run multiple processes in parallel to parse dags.
スケジューラは、DAGをパースするために並列のマルチプロセスで実行できる
This defines how many processes will run.
これは、どの位のプロセスが実行できるかを定義する

b) max_threads
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-threads-deprecated

Deprecated since version 1.10.14:
 The option has been moved to scheduler.parsing_processes

5)default_pool_task_slot_count / non_pooled_task_slot_count

* default_poolのタスクスロット数

a) default_pool_task_slot_count
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#default-pool-task-slot-count

New in version 2.2.0.

Task Slot counts for default_pool.
default_poolのタスクスロット数

This setting would not have any effect in an existing deployment
 where the default_pool is already created.
この設定は、default_poolが既に作成されている存在する開発環境内において
何も影響がないでしょう。

For existing deployments, users can change the number of slots
 using Webserver, API or the CLI
存在する環境において、ユーザは、Webserver, API, CLIを使って
スロット数を変えることができる。

b) non_pooled_task_slot_count
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#non-pooled-task-slot-count-deprecated

non_pooled_task_slot_count (Deprecated)

Deprecated since version 1.10.4:
v1.10.4から非推奨

 The option has been moved to core.default_pool_task_slot_count

6)executor

* Airflowで使用する実行(executor)クラス(※1)

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#executor

The executor class that airflow should use.
Airflowで使用されているexecutorクラス。

Choices include
 SequentialExecutor,
 LocalExecutor,
 CeleryExecutor,
 DaskExecutor,
 KubernetesExecutor,
 CeleryKubernetesExecutor
 or the full import path to the class when using a custom executor.

※1:実行(executor)クラス

* 以下の公式サイトを参照

https://airflow.apache.org/docs/apache-airflow/stable/executor/index.html

* 以下の一般サイトも参照。

https://dev.classmethod.jp/articles/apache-airflow-parallelism-and-concurrency/

7)task_runner

* サブプロセス内でのタスクインスタンス実行で使用するクラス
(StandardTaskRunner/CgroupTaskRunner)

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-runner

The class to use for running task instances in a subprocess.
サブプロセス内でのタスクインスタンス実行で使用するクラス

Choices include
 StandardTaskRunner,
 CgroupTaskRunner
 or the full import path to the class when using a custom task runner.
StandardTaskRunner, CgroupTaskRunner又は
独自Task runnerを使ったクラスのフルインポートパスを選択してください

StandardTaskRunner
https://github.com/apache/airflow/blob/main/airflow/task/task_runner/standard_task_runner.py
CgroupTaskRunner
https://github.com/apache/airflow/blob/main/airflow/task/task_runner/cgroup_task_runner.py

【2】DAGでの制御

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG

に記載されている。

1)max_active_tasks / concurrency

* 指定したDAGのおける同時実行のタスク最大数
* 「【1】airflow.cfgでの制御 - Airflow全体の設定」の
 「2)max_active_tasks_per_dag / dag_concurrency」も参照
* MWAA(v2.0.2)の場合、「concurrency」を使用する

a) max_active_tasks

max_active_tasks (int)
 -- the number of task instances allowed to run concurrently
 -- 同時実行を許容するタスクインスタンスの数

b) concurrency

* 既に非推奨
 => Airflow v3.0 で削除されそう
 (「TODO: Remove in Airflow 3.0」ってあるし)

https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/models/dag.html#DAG.concurrency

    @property
    def concurrency(self) -> int:
        # TODO: Remove in Airflow 3.0
        warnings.warn(
            "The 'DAG.concurrency' attribute is deprecated. Please use 'DAG.max_active_tasks'.",
            DeprecationWarning,
            stacklevel=2,
        )
        return self._max_active_tasks

2)max_active_runs

* 指定したDAGに対して、同時に実行できるDAGインスタンス数
* 「【1】airflow.cfgでの制御 - Airflow全体の設定」の
 「3)max_active_runs_per_dag」も参照

max_active_runs (int)
 -- maximum number of active DAG runs,
 beyond this number of DAG runs in a running state,
 the scheduler won't create new active DAG runs
 -- アクティブなDAG実行インスタンスの最大数で
 ステータス「実行中」で、このDAG実行インスタンスの数を超えた場合
 スケジューラは新しいアクティブなDAG実行インスタンスを生成しない

補足:「max_active_runs = 1」の動作について
https://github.com/apache/airflow/issues/9975

~~~~
max_active_runs = 1 can still create multiple active execution runs

I have max_active_runs = 1 in my dag file
 (which consists of multiple tasks) and I manually triggered a dag.

While it was running, a second execution began
 under its scheduled time while the first execution was running.
~~~~

同様なことが、MWAA(v2.0.2)でも同じ結果で、
max_active_runs = 1を設定したにも関わらず、
実行中でも、手動で実行でき、複数起動できてしまった

https://github.com/apache/airflow/issues/9975#issuecomment-927015597

~~~~
Issue does not happen on 2.1.4
~~~~

確かに、MWAA(v2.0.2)の環境で、
以下の実験コードのように「max_active_runs = 1」を指定したのだが
複数で実行できてしまった。。。(v2.1.4 だと修正されている?)

3)実験コード

import os
import time
import datetime
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator


def say_hello(**context):
  print(f"こんにちは世界。10秒寝ます Zzz ... {datetime.datetime.now()}")
  time.sleep(10)
  print(f"Done ... {datetime.datetime.now()}")

defalut_args = {
  "start_date": days_ago(2),
  "provide_context": True,
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  description='This is a simple demo.',
  default_args=defalut_args,
  schedule_interval=None,
  # ★注目★
  # 例えば、concurrency=2を指定した場合、
  # job1_1, job1_2 は同時実行できるが、job1_3は待たされる
  concurrency=2,
  #max_active_tasks=2,
  max_active_runs=1,
  tags=['hello_world'],
) as dag:
  job1_1 = PythonOperator(
    task_id='say_hello_task1_1',
    dag=dag,
    python_callable=say_hello,
  )
  job1_2 = PythonOperator(
    task_id='say_hello_task1_2',
    dag=dag,
    python_callable=say_hello,
  )
  job1_3 = PythonOperator(
    task_id='say_hello_task1_3',
    dag=dag,
    python_callable=say_hello,
  )
  job2 = PythonOperator(
    task_id='say_hello_task2',
    dag=dag,
    python_callable=say_hello,
  )
  [job1_1, job1_2, job1_3] >> job2

参考文献

https://scrapbox.io/ohbarye/Airflow%E3%81%AEDAG%E3%82%84task%E3%81%AE%E5%90%8C%E6%99%82%E5%AE%9F%E8%A1%8C%E6%95%B0
https://dev.classmethod.jp/articles/apache-airflow-parallelism-and-concurrency/
https://qiita.com/tmy310/items/9d657445b768f797c990

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000
MWAA ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/09/29/131101