■ はじめに
Apache Airflow の DAG の コンストラクタ引数やデフォルト引数について 調査する必要ができたので、メモ。
目次
【0】DAGの引数 【1】基本的な引数 1)dag_id 2)description 3)default_args 4)tags 【2】実行タイミングに関わる引数 【3】コールバック関数 1)on_success_callback 2)on_failure_callback 3)sla_miss_callback 【4】タイムアウト関連の引数 【5】同時実行 / 並列関連の引数
【0】DAGの引数
* DAGの全ての引数は、以下の公式サイトを参照。
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG
http://man.hubwiz.com/docset/Airflow.docset/Contents/Resources/Documents/code.html#airflow.models.DAG
【1】基本的な引数
* 以下があげられる。 ~~~~~~~ 1)dag_id 2)description 3)default_args 4)tags ~~~~~~~
1)dag_id
* DAG の ID => 唯一の必須項目
データ型 / デフォルト値
* str
2)description
* DAGの説明文 => Web UI 上に表示される
データ型 / デフォルト値
* Optional[str] * Default : None
3)default_args
* 各オペレーターに渡すデフォルト引数
https://airflow.readthedocs.io/en/1.9.0/concepts.html#default-arguments
If a dictionary of default_args is passed to a DAG, it will apply them to any of its operators. [訳] default_args の dict が DAGから渡されたら、 それぞれのOperatorに、その値が適用されます。 ※ 以下の「サンプル」を見れば、イメージしやすい
サンプル
default_args=dict( owner='Airflow') dag = DAG('my_dag', default_args=default_args) op = DummyOperator(task_id='dummy', dag=dag) print(op.owner) # Airflow
どんな値がサポートされているのか?
* 以下に記載されている。 => リトライ関連の「retries」などは 設定した方がコードとしてすっきりしそう
https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#default-arguments
# より抜粋 # These args will get passed on to each operator # [訳] それぞれの引数は各オペレータに渡されます # You can override them on a per-task basis during operator initialization # [訳] オペレータ初期化の間、これらの値をタスク単位で上書きすることができます default_args = { # 所有者 'owner': 'airflow', # 過去に依存するOperatorかどうか 'depends_on_past': False, # 障害発生時などにメール送信を行う宛先 'email': ['airflow@example.com'], # タスク失敗時にメールを送信するか否か 'email_on_failure': False, # タスクのリトライが発生した際にメールを送信するか否か 'email_on_retry': False, # タスク失敗時のリトライ回数 'retries': 1, # タスクが失敗してからリトライが行われるまでの待ち時間 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 優先順位の重みづけ # 'priority_weight': 10, # タスクの終了日時 # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'sla': timedelta(hours=2), # 実行タイムアウト # 'execution_timeout': timedelta(seconds=300), # 失敗時のコールバック関数 # 'on_failure_callback': some_function, # 成功時のコールバック関数 # 'on_success_callback': some_other_function, # リトライ時のコールバック関数 # 'on_retry_callback': another_function, # SLAタイムアウト時のコールバック関数 # 'sla_miss_callback': yet_another_function, # 'trigger_rule': 'all_success' }
データ型 / デフォルト値
* Optional[Dict] * Default : None
4)tags
* タグ一覧 => Web UI 上でフィルタリングするのに便利
データ型 / デフォルト値
* Optional[List[str]] * Default : None
【2】実行タイミングに関わる引数
* 以下があげられる。 ~~~~~~~ 1)start_date 2)end_date 3)schedule_interval 4)catchup 5)timetable ~~~~~~~ => 詳細は、以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2022/01/15/014005
【3】コールバック関数
* 以下があげられる。 ~~~~~~~ 1)on_success_callback 2)on_failure_callback 3)sla_miss_callback ~~~~~~~
1)on_success_callback
* 成功時のコールバック関数を登録
データ型 / デフォルト値
* Optional[DagStateChangeCallback] * Default : None
2)on_failure_callback
* 失敗時のコールバック関数を登録
データ型 / デフォルト値
* Optional[DagStateChangeCallback] * Default : None
3)sla_miss_callback
* SLAタイムアウト時のコールバック関数を登録 => SLA = Service Level Agreement => 以下のサイトが詳しいので、詳細はこちらを参照。
https://future-architect.github.io/articles/20200827/
データ型 / デフォルト値
* Optional[Callable[[‘DAG’, str, str, List[str], List[TaskInstance]], None]] * Default : None
【4】タイムアウト関連の引数
* 以下があげられる。 ~~~~~~~ 1)dagrun_timeout ~~~~~~~ => 詳細は、以下の関連記事を参照のこと。
Apache Airflow ~ タイムアウト ~
https://dk521123.hatenablog.com/entry/2021/10/12/000000
【5】同時実行 / 並列関連の引数
* 以下があげられる。 ~~~~~~~ 1)max_active_tasks / concurrency 2)max_active_runs ~~~~~~~ => 詳細は、以下の関連記事を参照のこと。
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
参考文献
https://future-architect.github.io/articles/20200131/#1-%E6%97%A5%E6%99%82
関連記事
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ 実行タイミング ~
https://dk521123.hatenablog.com/entry/2022/01/15/014005
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト ~
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ CLI ~
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840