【Airflow】Apache Airflow ~ DAGの引数 ~

■ はじめに

 Apache Airflow の DAG の
コンストラクタ引数やデフォルト引数について
調査する必要ができたので、メモ。

目次

【0】DAGの引数
【1】基本的な引数
 1)dag_id
 2)description
 3)default_args
 4)tags
【2】実行タイミングに関わる引数
【3】コールバック関数
 1)on_success_callback
 2)on_failure_callback
 3)sla_miss_callback
【4】タイムアウト関連の引数
【5】同時実行 / 並列関連の引数

【0】DAGの引数

* DAGの全ての引数は、以下の公式サイトを参照。

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG
http://man.hubwiz.com/docset/Airflow.docset/Contents/Resources/Documents/code.html#airflow.models.DAG

【1】基本的な引数

* 以下があげられる。
~~~~~~~
1)dag_id
2)description
3)default_args
4)tags
~~~~~~~

1)dag_id

* DAG の ID
 => 唯一の必須項目

データ型 / デフォルト値

* str

2)description

* DAGの説明文
 => Web UI 上に表示される

データ型 / デフォルト値

* Optional[str]
* Default : None

3)default_args

* 各オペレーターに渡すデフォルト引数

https://airflow.readthedocs.io/en/1.9.0/concepts.html#default-arguments

If a dictionary of default_args is passed to a DAG,
 it will apply them to any of its operators.

[訳] default_args の dict が DAGから渡されたら、
それぞれのOperatorに、その値が適用されます。

※ 以下の「サンプル」を見れば、イメージしやすい

サンプル

default_args=dict(
  owner='Airflow')

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)

print(op.owner) # Airflow

どんな値がサポートされているのか?

* 以下に記載されている。
 => リトライ関連の「retries」などは
  設定した方がコードとしてすっきりしそう

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#default-arguments

# より抜粋

# These args will get passed on to each operator
# [訳] それぞれの引数は各オペレータに渡されます
# You can override them on a per-task basis during operator initialization
# [訳] オペレータ初期化の間、これらの値をタスク単位で上書きすることができます
default_args = {
    # 所有者
    'owner': 'airflow',
    # 過去に依存するOperatorかどうか
    'depends_on_past': False,
    # 障害発生時などにメール送信を行う宛先
    'email': ['airflow@example.com'],
    # タスク失敗時にメールを送信するか否か
    'email_on_failure': False,
    # タスクのリトライが発生した際にメールを送信するか否か
    'email_on_retry': False,
    # タスク失敗時のリトライ回数
    'retries': 1,
    # タスクが失敗してからリトライが行われるまでの待ち時間
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 優先順位の重みづけ
    # 'priority_weight': 10,
    # タスクの終了日時
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 実行タイムアウト
    # 'execution_timeout': timedelta(seconds=300),
    # 失敗時のコールバック関数
    # 'on_failure_callback': some_function,
    # 成功時のコールバック関数
    # 'on_success_callback': some_other_function,
    # リトライ時のコールバック関数
    # 'on_retry_callback': another_function,
    # SLAタイムアウト時のコールバック関数
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

データ型 / デフォルト値

* Optional[Dict]
* Default : None

4)tags

* タグ一覧
 => Web UI 上でフィルタリングするのに便利

データ型 / デフォルト値

* Optional[List[str]]
* Default : None

【2】実行タイミングに関わる引数

* 以下があげられる。
~~~~~~~
1)start_date
2)end_date
3)schedule_interval
4)catchup
5)timetable
~~~~~~~

 => 詳細は、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2022/01/15/014005

【3】コールバック関数

* 以下があげられる。
~~~~~~~
1)on_success_callback
2)on_failure_callback
3)sla_miss_callback
~~~~~~~

1)on_success_callback

* 成功時のコールバック関数を登録

データ型 / デフォルト値

* Optional[DagStateChangeCallback]
* Default : None

2)on_failure_callback

* 失敗時のコールバック関数を登録

データ型 / デフォルト値

* Optional[DagStateChangeCallback]
* Default : None

3)sla_miss_callback

* SLAタイムアウト時のコールバック関数を登録
 => SLA = Service Level Agreement
 => 以下のサイトが詳しいので、詳細はこちらを参照。

https://future-architect.github.io/articles/20200827/

データ型 / デフォルト値

* Optional[Callable[[‘DAG’, str, str, List[str], List[TaskInstance]], None]]
* Default : None

【4】タイムアウト関連の引数

* 以下があげられる。
~~~~~~~
1)dagrun_timeout
~~~~~~~

 => 詳細は、以下の関連記事を参照のこと。

Apache Airflow ~ タイムアウト
https://dk521123.hatenablog.com/entry/2021/10/12/000000

【5】同時実行 / 並列関連の引数

* 以下があげられる。
~~~~~~~
1)max_active_tasks / concurrency
2)max_active_runs
~~~~~~~

 => 詳細は、以下の関連記事を参照のこと。

Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148

参考文献

https://future-architect.github.io/articles/20200131/#1-%E6%97%A5%E6%99%82

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ 実行タイミング ~
https://dk521123.hatenablog.com/entry/2022/01/15/014005
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ CLI
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840