■ はじめに
Apache Airflow の 設定値を保持する仕組み・機構が揃っているので、 その内の Variable について扱う
目次
【0】設定値の保持 - Variable 【1】Web UIからデータ追加 【2】CLIからデータ追加 【3】REST APIからデータ追加 【4】DAG からデータ追加 【5】サンプル
【0】設定値の保持 - Variable
* 設定値を保持・管理してくれる機能(これって環境変数?) * キーの値が、パスワード的なものだと、 Web UIで見た際に「******」みたいな感じで表示してくれる
公式ドキュメント
https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
【1】Web UIからデータ追加
* Web UI上で追加できる方法は、以下の通り。
1)値を追加
[1] Airflow の Web UI ページにアクセスし、[Admin]-[Variables]を選択 [2] 「+ (Add a new record)」押下 [3] Key/Val にそれぞれ、設定キー/値を入力し「Save」ボタン押下
2)ファイルからのインポート
[0] 事前準備として、以下のようなJSONファイルを作成 => key名にパスワードっぽいの名前を付けると Web UI表示時にマスクしてくれたりとかしてくれる ~~~~~~~~ { "your_slack_api_token": "xxxxxxxxx", "your_rds_username": "admin", "your_rds_password": "your-admin-password" } ~~~~~~~~ [1] Airflow の Web UI ページにアクセスし、[Admin]-[Variables]を選択 [2] 「Choose File」押下し、[0]のJSONファイルを選択し、 「Import Variables」を押下
【2】CLIからデータ追加
* 他にも、CLIで追加する方法などがある => CI/CD時などで使えそう => CLIについては、以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2021/10/21/130702
【3】REST API からデータ追加
* 以下の公式ドキュメントを参照
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#tag/Variable
AIRFLOW_URL="http://localhost:8080/api/v1/variables" AIRFLOW_USERNAME="airflow" AIRFLOW_PASSWORD="airflow" KEY="message" VALUE="Hello World" # ★ここ★ curl -X POST "${AIRFLOW_URL}" \ -H "Content-Type: application/json" \ --user "${AIRFLOW_USERNAME}:${AIRFLOW_PASSWORD}" \ -d "{\"key\": \"${KEY}\", \"value\": \"${VALUE}\"}"
【4】DAG からデータ追加
* Variable.set() を使って、DAG内 で設定する => 以下のサイトを参照。
https://qiita.com/pioho07/items/fd07eb495b8f7b9e8a9d
1)サンプル
例1:値の取得
import os from airflow import DAG from airflow.models import Variable # ★注目★ from airflow.utils.dates import days_ago from airflow.operators.python import PythonOperator def say_hello(**context): # ★注目★ token = Variable.get("your_slack_api_token") rds_username = Variable.get("your_rds_username") rds_password = Variable.get("your_rds_password") # ★上記のJSONファイルをインポートしていれば、出力は以下のようになる # => token=xxxxxxxxx, rds_username=admin, rds_password=your-admin-password print(f"token={token}, rds_username={rds_username}, rds_password={rds_password}") defalut_args = { "start_date": days_ago(2), "provide_context": True } with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), description='This is a simple demo.', default_args=defalut_args, schedule_interval=None, tags=['hello_world'], ) as dag: job1 = PythonOperator( task_id='say_hello_task', dag=dag, python_callable=say_hello, ) job1
関連記事
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ CLI ~
https://dk521123.hatenablog.com/entry/2021/10/21/130702
MWAA ~ Variable ~
https://dk521123.hatenablog.com/entry/2023/12/28/002530