【Airflow】Apache Airflow ~ Variable ~

■ はじめに

Apache Airflow の 
設定値を保持する仕組み・機構が揃っているので、
その内の Variable について扱う

目次

【0】設定値の保持 - Variable
【1】Web UIからデータ追加
【2】CLIからデータ追加
【3】REST APIからデータ追加
【4】DAG からデータ追加
【5】サンプル

【0】設定値の保持 - Variable

* 設定値を保持・管理してくれる機能(これって環境変数?)
* キーの値が、パスワード的なものだと、
 Web UIで見た際に「******」みたいな感じで表示してくれる

公式ドキュメント
https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html

【1】Web UIからデータ追加

* Web UI上で追加できる方法は、以下の通り。

1)値を追加

[1] Airflow の Web UI ページにアクセスし、[Admin]-[Variables]を選択
[2] 「+ (Add a new record)」押下
[3] Key/Val にそれぞれ、設定キー/値を入力し「Save」ボタン押下

2)ファイルからのインポート

[0] 事前準備として、以下のようなJSONファイルを作成
 => key名にパスワードっぽいの名前を付けると
  Web UI表示時にマスクしてくれたりとかしてくれる
~~~~~~~~
{
  "your_slack_api_token": "xxxxxxxxx",
  "your_rds_username": "admin",
  "your_rds_password": "your-admin-password"
}  
~~~~~~~~

[1] Airflow の Web UI ページにアクセスし、[Admin]-[Variables]を選択
[2] 「Choose File」押下し、[0]のJSONファイルを選択し、
  「Import Variables」を押下

【2】CLIからデータ追加

* 他にも、CLIで追加する方法などがある
 => CI/CD時などで使えそう
 => CLIについては、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/21/130702

【3】REST API からデータ追加

* 以下の公式ドキュメントを参照

https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#tag/Variable

サンプル
https://stackoverflow.com/questions/75695446/what-should-be-structure-of-payload-to-import-multiple-variables-to-airflow-ui-u

AIRFLOW_URL="http://localhost:8080/api/v1/variables"
AIRFLOW_USERNAME="airflow"
AIRFLOW_PASSWORD="airflow"

KEY="message"
VALUE="Hello World"

# ★ここ★
curl -X POST "${AIRFLOW_URL}" \
  -H "Content-Type: application/json" \
  --user "${AIRFLOW_USERNAME}:${AIRFLOW_PASSWORD}" \
  -d "{\"key\": \"${KEY}\", \"value\": \"${VALUE}\"}"

【4】DAG からデータ追加

* Variable.set() を使って、DAG内 で設定する
 => 以下のサイトを参照。

https://qiita.com/pioho07/items/fd07eb495b8f7b9e8a9d

1)サンプル

例1:値の取得

import os

from airflow import DAG
from airflow.models import Variable # ★注目★
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator


def say_hello(**context):
  # ★注目★
  token = Variable.get("your_slack_api_token")
  rds_username = Variable.get("your_rds_username")
  rds_password = Variable.get("your_rds_password")
  # ★上記のJSONファイルをインポートしていれば、出力は以下のようになる
  # => token=xxxxxxxxx, rds_username=admin, rds_password=your-admin-password
  print(f"token={token}, rds_username={rds_username}, rds_password={rds_password}")

defalut_args = {
  "start_date": days_ago(2),
  "provide_context": True
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  description='This is a simple demo.',
  default_args=defalut_args,
  schedule_interval=None,
  tags=['hello_world'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello_task',
    dag=dag,
    python_callable=say_hello,
  )
  job1

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ CLI
https://dk521123.hatenablog.com/entry/2021/10/21/130702
MWAA ~ Variable ~
https://dk521123.hatenablog.com/entry/2023/12/28/002530