【Airflow】Apache Airflow ~ CLI ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/10/10/000000

でリトライ(Re-run)を扱ったが、その調査の過程で、
以下のサイト

https://blog.imind.jp/entry/2019/02/22/000049

などで コマンド(CLI; Command Line Interface)からでも、
タスクをクリアするとか色々できることをした。

そんなことを含めて、CLIについて、まとめておく。

 また、今回のブログを書いていて思ったのだが、
上記のサイトだとAirflow v1がベースなようだが
今回は、MWAA(v2.0.2)に合わせて、v2ベースで書いていく。

Airflow v1とv2だとCLIの構文が異なっている模様。
v3 も出るらしいし、この記事もすぐに古くなっていくな、、、

目次

【0】Airflowの CLI
 1)API仕様
 2)基本的な構文
 補足:MWAAにおける CLI
【1】cheat-sheet / help
 1)cheat-sheet
 2)help
【2】dag
 1)backfill
 2)pause
 3)unpause
【3】tasks
 1)clear
 2)run
【4】connections
 1)add
 2)list
【5】variables
 1)import
【6】airflow CLI あれこれ
 1)DAGから airflow CLIを実行するには

【0】Airflowの CLI

1)API仕様

* 公式サイトは、以下。

v2.0.2
https://airflow.apache.org/docs/apache-airflow/2.0.2/cli-and-env-variables-ref.html
最新版(stable)
https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html

2)基本的な構文

airflow [-h] GROUP_OR_COMMAND ...

# GROUP_OR_COMMAND : 「tasks」「users」や「add」「delete」など

補足:MWAAにおける CLI

Amazon MWAA(Managed Workflows for Apache Airflow) だと
若干異なることもあるようなので、メモ。(必要だったら、別記事で扱うかも)

* 使用できないコマンドとかあるらしい

https://docs.aws.amazon.com/ja_jp/mwaa/latest/userguide/airflow-cli-command-reference.html

* 実行するのに、トークンの発行とかもあるみたい

https://docs.aws.amazon.com/ja_jp/mwaa/latest/userguide/call-mwaa-apis-cli.html

【1】cheat-sheet / help

1)cheat-sheet

* help 的なコマンド
 => コマンドをど忘れた際に、、、

コマンド例

airflow cheat-sheet

以下、出力結果
Miscellaneous commands
airflow cheat-sheet                       | Display cheat sheet
・・・略・・・
airflow version                           | Show the version
・・・略・・・

2)Help

コマンド例

airflow connections add -h

以下、出力結果
usage: airflow connections add [-h] [--conn-description CONN_DESCRIPTION]
・・・略・・・

【2】dag

* DAGをコントロールするコマンド

基本的な構文
https://airflow.apache.org/docs/apache-airflow/2.0.2/cli-and-env-variables-ref.html#dags

airflow dags [-h] COMMAND ...

# COMMAND : 
# backfill, delete, list, list-jobs, list-runs, next-execution, pause, report, show,
# state, test, trigger, unpause

1)backfill

https://airflow.apache.org/docs/apache-airflow/2.0.2/cli-and-env-variables-ref.html#backfill

Run subsections of a DAG for a specified date range.
指定された日付範囲におけるサブセクションのDAGを実行する

If reset_dag_run option is used,
 backfill will first prompt users whether airflow should clear all the previous dag_run and task_instances
 within the backfill date range.
もし、reset_dag_run オプションを使った場合
backfill は、そのbackfill が指定された日付範囲内で、
airflow が全ての前回のdag_run とtask_instancesをクリアするかどうかを
ユーザに促します

If rerun_failed_tasks is used,
 backfill will auto re-run the previous failed task instances within the backfill date range
もし、rerun_failed_tasks オプションを使った場合
backfill は、そのbackfill が指定された日付範囲内で、
自動的に前回エラーになったタスクインスタンスを再実行します。

構文

airflow dags backfill
 [-h] [-c CONF] [--delay-on-limit DELAY_ON_LIMIT] [-x]
 [-n] [-e END_DATE] [-i] [-I] [-l] [-m] [--pool POOL]
 [--rerun-failed-tasks] [--reset-dagruns] [-B]
 [-s START_DATE] [-S SUBDIR] [-t TASK_REGEX] [-v] [-y]
 dag_id

2)pause

* DAG の一時停止

構文

airflow dags pause [-h] [-S SUBDIR] dag_id

3)unpause

* DAG の再開

構文

airflow dags unpause [-h] [-S SUBDIR] dag_id

【3】tasks

* タスクをコントロールするコマンド

基本的な構文
https://airflow.apache.org/docs/apache-airflow/2.0.2/cli-and-env-variables-ref.html#tasks

airflow tasks [-h] COMMAND ...

# COMMAND : 
#  clear, failed-deps, list, render, run, state, states-for-dag-run, test

1)clear

* タスクのクリア

構文
https://airflow.apache.org/docs/apache-airflow/2.0.2/cli-and-env-variables-ref.html#clear

airflow tasks clear
 [-h] [-R] [-d] [-e END_DATE] [-X] [-x] [-f] [-r]
 [-s START_DATE] [-S SUBDIR] [-t TASK_REGEX] [-u] [-y]
 dag_id

コマンド例

# -s  START_DATE : Override start_date YYYY-MM-DD
# -t TASK_REGEX : The regex to filter specific task_ids to backfill (optional)
# -d : Include downstream tasks
# -y : Do not prompt to confirm reset. Use with care! (defalut: False)

airflow tasks clear -s 2021-10-21 -t task1 -d -y clear_upstream_task

2)run

* タスクの実行

構文
https://airflow.apache.org/docs/apache-airflow/2.0.2/cli-and-env-variables-ref.html#run

airflow tasks run
 [-h] [--cfg-path CFG_PATH] [--error-file ERROR_FILE] [-f]
 [-A] [-i] [-I] [-N] [-l] [-m] [-p PICKLE] [--pool POOL]
 [--ship-dag] [-S SUBDIR]
 dag_id task_id execution_date

【4】connections

* Airflow の connections を管理するコマンド
* connections については、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/16/000454

基本的な構文

airflow connections [-h] COMMAND ...

# COMMAND: 
#  add, delete, export, get, list

1)add

構文

airflow connections add
 [-h] [--conn-description CONN_DESCRIPTION]
 [--conn-extra CONN_EXTRA] [--conn-host CONN_HOST]
 [--conn-login CONN_LOGIN]
 [--conn-password CONN_PASSWORD]
 [--conn-port CONN_PORT] [--conn-schema CONN_SCHEMA]
 [--conn-type CONN_TYPE] [--conn-uri CONN_URI]
 conn_id

コマンド例

airflow connections add --conn-type http --conn-host https://hooks.slack.com/services --conn-extra {"webhook_token":"/A00000000/B111111/XXXXXXXXXXXXXXXXXXXXXXXX"} slack_webhook_demo

2)list

コマンド例

airflow connections list

【5】variables

* Airflow の variables を管理するコマンド
* variables については、以下の関連記事を参照のこと。

Apache Airflow ~ Variable ~
https://dk521123.hatenablog.com/entry/2023/12/17/000000

基本的な構文

airflow variables [-h] COMMAND ...

# COMMAND: 
#  delete, export, get, import, list, set

https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#variables

注意点

* add がない
 => 逆に、connections は import がない!!!??
 => set でやるっぽい (以下のサイト参照)

https://blog.imind.jp/entry/2019/04/22/184759

1)import

* JSON形式のファイルでインポートするコマンド

https://airflow.apache.org/docs/apache-airflow/2.0.2/cli-and-env-variables-ref.html#import_repeat2
構文

airflow variables import [-h] file

【6】airflow CLI あれこれ

1)DAGから airflow CLIを実行するには

* 以下のサイトにあるように「BashOperator」を使えば可能。
 => 因みに、このサンプルをMWAA(v2.0.2)上で実行したら、
  無限ループした。。。(同等のことをやりたい場合、制御が必要)

https://stackoverflow.com/questions/43270820/how-to-restart-a-failed-task-on-airflow

使用イメージ

from airflow.operators.bash_operator import BashOperator

def on_failure(context):
    # ★ここ★
    sample_job = BashOperator(
        task_id='sample_job',
        bash_command='airflow cheat-sheet'
    )
    return sample_job.execute(context=context)

# ・・・略・・・

    job1= BashOperator(
        task_id='job1',
        bash_command='exit 1',
        on_failure_callback=on_failure
    )

    job1

参考文献

https://blog.imind.jp/entry/2019/04/22/184759

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ Variable ~
https://dk521123.hatenablog.com/entry/2023/12/17/000000
Apache Airflow ~ Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000
MWAA ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/09/29/131101
Docker ~ 基本編 / dockerコマンド ~
https://dk521123.hatenablog.com/entry/2020/04/13/000000