■ はじめに
色々と Airflow を触っていると 設定値を保持する仕組み・機構が揃っているので、 メモしておく
目次
【1】設定値の保持 - Variable 1)Web UIからデータの追加方法 2)それ以外の追加方法 3)サンプル 【2】コネクションの保持 - Connection 1)Web UIからデータの追加方法 2)設定例 3)サンプル
【1】設定値の保持 - Variable
* 設定値を保持・管理してくれる機能(これって環境変数?) * キーの値が、パスワード的なものだと、 Web UIで見た際に「******」みたいな感じで表示してくれる
公式ドキュメント
https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
1)Web UIからデータの追加方法
* Web UI上で追加できる方法は、以下の通り。
a) 値を追加
[1] Airflow の Web UI ページにアクセスし、[Admin]-[Variables]を選択 [2] 「+ (Add a new record)」押下 [3] Key/Val にそれぞれ、設定キー/値を入力し「Save」ボタン押下
b) ファイルからのインポート
[0] 事前準備として、以下のようなJSONファイルを作成 => key名にパスワードっぽいの名前を付けると Web UI表示時にマスクしてくれたりとかしてくれる ~~~~~~~~ { "your_slack_api_token": "xxxxxxxxx", "your_rds_username": "admin", "your_rds_password": "your-admin-password" } ~~~~~~~~ [1] Airflow の Web UI ページにアクセスし、[Admin]-[Variables]を選択 [2] 「Choose File」押下し、[0]のJSONファイルを選択し、 「Import Variables」を押下
2)それ以外の追加方法
a) CLI
* 他にも、CLIで追加する方法などがある => CI/CD時などで使えそう => CLIについては、以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2021/10/21/130702
b) DAG で設定する
* Variable.set() を使って、DAG内 で設定する => 以下のサイトを参照。
https://qiita.com/pioho07/items/fd07eb495b8f7b9e8a9d
3)サンプル
例1:値の取得
import os from airflow import DAG from airflow.models import Variable # ★注目★ from airflow.utils.dates import days_ago from airflow.operators.python import PythonOperator def say_hello(**context): # ★注目★ token = Variable.get("your_slack_api_token") rds_username = Variable.get("your_rds_username") rds_password = Variable.get("your_rds_password") # ★上記のJSONファイルをインポートしていれば、出力は以下のようになる # => token=xxxxxxxxx, rds_username=admin, rds_password=your-admin-password print(f"token={token}, rds_username={rds_username}, rds_password={rds_password}") defalut_args = { "start_date": days_ago(2), "provide_context": True } with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), description='This is a simple demo.', default_args=defalut_args, schedule_interval=None, tags=['hello_world'], ) as dag: job1 = PythonOperator( task_id='say_hello_task', dag=dag, python_callable=say_hello, ) job1
【2】コネクションの保持 - Connection
* DBの接続情報 や Slack webhook用の接続情報を保持できる仕組み * AWS Glue でいう Glue connectionみたいなもの。
1)Web UIからデータの追加方法
a) 値を追加
[1] Airflow の Web UI ページにアクセスし、[Admin]-[Connections]を選択 [2] 「+ (Add a new record)」押下 [3] Conn id/Conn Typeなど にそれぞれ入力し「Save」ボタン押下
2)設定例
例1:SlackのWebhook
Slack の Webhook URLが、 https://hooks.slack.com/services/A00000000/B111111/XXXXXXXXXXXXXXXXXXXXXXXX の場合は、設定手順は以下の通り。 [1] 上記の「a) 値を追加」の[1]~[2] を実行 [2] 以下をそれぞれ入力し「Save」ボタン押下 + Conn id : 任意の名前 (今回は「slack_webhook_demo」) + Conn Type : HTTP + Host : https://hooks.slack.com/services + Extra : {"webhook_token":"/A00000000/B111111/XXXXXXXXXXXXXXXXXXXXXXXX"}
3)サンプル
SlackWebhookOperatorを使ってSlackへ通知
* 以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2021/10/09/000000
参考文献
https://blog.imind.jp/entry/2019/02/08/170332
https://k11i.biz/blog/2019/04/23/slack-notification-airflow-1-10-3/
関連記事
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ CLI ~
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト ~
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000
MWAA ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/09/29/131101