【Airflow】Apache Airflow ~ Variable / Connection ~

■ はじめに

 色々と Airflow を触っていると
設定値を保持する仕組み・機構が揃っているので、
メモしておく

目次

【1】設定値の保持 - Variable
 1)Web UIからデータの追加方法
  a) 値を追加
  b) ファイルからのインポート
 2)サンプル
  例1:値の取得
【2】コネクションの保持 - Connection
 1)Web UIからデータの追加方法
  a) 値を追加
 2)設定例
  例1:SlackのWebhook
 3)サンプル
  SlackWebhookOperatorを使ってSlackへ通知

【1】設定値の保持 - Variable

* 設定値を保持・管理してくれる機能(これって環境変数?)
* キーの値が、パスワード的なものだと、
 Web UIで見た際に「******」みたいな感じで表示してくれる

公式ドキュメント
https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html

1)Web UIからデータの追加方法

* Web UI上で追加できる方法は、以下の通り。
* 他にも、CLIで追加する方法などがある
 => CI/CD時などで使えそう
 => CLIについては、以下のサイトを参照のこと

https://blog.imind.jp/entry/2019/04/22/184759

a) 値を追加

[1] Airflow の Web UI ページにアクセスし、[Admin]-[Variables]を選択
[2] 「+ (Add a new record)」押下
[3] Key/Val にそれぞれ、設定キー/値を入力し「Save」ボタン押下

b) ファイルからのインポート

[0] 事前準備として、以下のようなJSONファイルを作成
 => key名にパスワードっぽいの名前を付けると
  Web UI表示時にマスクしてくれたりとかしてくれる
~~~~~~~~
{
  "your_slack_api_token": "xxxxxxxxx",
  "your_rds_username": "admin",
  "your_rds_password": "your-admin-password"
}  
~~~~~~~~

[1] Airflow の Web UI ページにアクセスし、[Admin]-[Variables]を選択
[2] 「Choose File」押下し、[0]のJSONファイルを選択し、
  「Import Variables」を押下

2)サンプル

例1:値の取得

import os

from airflow import DAG
from airflow.models import Variable # ★注目★
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator


def say_hello(**context):
  # ★注目★
  token = Variable.get("your_slack_api_token")
  rds_username = Variable.get("your_rds_username")
  rds_password = Variable.get("your_rds_password")
  # ★上記のJSONファイルをインポートしていれば、出力は以下のようになる
  # => token=xxxxxxxxx, rds_username=admin, rds_password=your-admin-password
  print(f"token={token}, rds_username={rds_username}, rds_password={rds_password}")

defalut_args = {
  start_date=days_ago(2),
  "provide_context": True
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  description='This is a simple demo.',
  default_args=defalut_args,
  schedule_interval=None,
  tags=['hello_world'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello_task',
    dag=dag,
    python_callable=say_hello,
  )
  job1

【2】コネクションの保持 - Connection

* DBの接続情報 や Slack webhook用の接続情報を保持できる仕組み
* AWS Glue でいう Glue connectionみたいなもの。 

1)Web UIからデータの追加方法

a) 値を追加

[1] Airflow の Web UI ページにアクセスし、[Admin]-[Connections]を選択
[2] 「+ (Add a new record)」押下
[3] Conn id/Conn Typeなど にそれぞれ入力し「Save」ボタン押下

2)設定例

例1:SlackのWebhook

Slack の Webhook URLが、
https://hooks.slack.com/services/A00000000/B111111/XXXXXXXXXXXXXXXXXXXXXXXX  
の場合は、設定手順は以下の通り。

[1] 上記の「a) 値を追加」の[1]~[2] を実行
[2] 以下をそれぞれ入力し「Save」ボタン押下
 + Conn id : 任意の名前 (今回は「slack_webhook_demo」)
 + Conn Type : HTTP
 + Host : https://hooks.slack.com/services
 + Extra : {"webhook_token":"/A00000000/B111111/XXXXXXXXXXXXXXXXXXXXXXXX"}

3)サンプル

SlackWebhookOperatorを使ってSlackへ通知

* 以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/09/000000

参考文献

https://blog.imind.jp/entry/2019/02/08/170332
https://k11i.biz/blog/2019/04/23/slack-notification-airflow-1-10-3/

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000
MWAA ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/09/29/131101

【Slack】Slack ~ Incoming Webhooks ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/10/13/092235

の続き。

どうも Slackに対して、Tokenを使ってメッセージを表示させるより
Incoming Webhooks 経由で主流のようなので調べてみた。

調査してみて感じたのは、結構、古い内容が多々あるので
自分で動作確認しながら色々と試すのがいいと思う。

目次

【1】Slack Incoming Webhook
【2】Incoming Webhook の設定
 1)Slack Appを作成する
 2)Incoming Webhooksを有効にする
 3)動作確認
【3】その他設定
 1)特定のチャンネルにメッセージを表示させたい
 2)公式SDKを使用する
【4】サンプル
 例1:PythonからSlackへメッセージを送る(公式 SDKを使用しなかった場合)
 例2:PythonからSlackへメッセージを送る(公式 SDKを使用する場合)

【1】Slack Incoming Webhook

* 外部サービス/アプリ から Slack に対して、メッセージを送信するための機能
* 以下のサイトの説明が分かりやすいかも。

https://www.yukibnb.com/entry/slack_incoming_webhook_url#Incoming-Webhook-URL%E3%81%A8%E3%81%AF

【2】Incoming Webhook の設定

1)Slack Appを作成する
2)Incoming Webhooksを有効にする

=> 以下のサイトなどが参考になると思う。

https://christina04.hatenablog.com/entry/sending-messages-with-slack-app
https://slack.com/intl/ja-jp/help/articles/115005265063-Slack-%E3%81%A7%E3%81%AE-Incoming-Webhook-%E3%81%AE%E5%88%A9%E7%94%A8
https://www.yukibnb.com/entry/slack_incoming_webhook_url#%E7%B0%A1%E5%8D%98Slack%E3%81%AEIncoming-Webhook-URL%E3%82%92%E5%8F%96%E5%BE%97%E3%81%99%E3%82%8B%E6%96%B9%E6%B3%95

1)Slack Appを作成する

* 以下のサイトから、Slack Appを作成する

https://api.slack.com/apps

* 流れでできると思うが、以下の関連記事も参照。

https://dk521123.hatenablog.com/entry/2021/10/13/092235

2)Incoming Webhooksを有効にする

作成したSlack Appに対して、Incoming Webhooksを有効にする

[1] Slackのアプリ設定ページのサイドバー「Settings」の「Incoming Webhooks」を押下
[2] 「Activate Incoming Webhooks」をONにする

3)動作確認

* 有効にすると「Sample curl request to post to a channel:」に
 以下のコマンド例ような例が載っているので、実行し、動作確認してみる
 => 以下の例の場合、Slack上で「Hello World!!!」が表示されているはず。

https://slack.com/intl/ja-jp/

コマンド例(curlコマンドが使える場合)

curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello World!!!"}' https://hooks.slack.com/services/XXXXXXXXXXXXXXXX

【3】その他設定

1)特定のチャンネルにメッセージを表示させたい

[1] Slackのアプリ設定ページのサイドバー「Settings」の「Incoming Webhooks」を押下
[2] ページ下の方にある「Webhook URL」の「Add New Webhook to Workspace」を押下
[3] 「<アプリ名> の投稿先はどちらにしますか?」で指定のチャンネルを選択
[4] 「許可する」ボタン押下

2)公式SDKを使用する

* 公式SDKについては、以下のサイトを参照

公式サイト
https://slack.dev/python-slack-sdk/
https://slack.dev/python-slack-sdk/webhook/index.html
一般サイト
https://qiita.com/seratch/items/8f93fd0bf815b0b1d557

公式 SDKをインストールする

# 以下を実行し、公式 SDKをインストール
pip install slack_sdk

【4】サンプル

例1:PythonからSlackへメッセージを送る(公式 SDKを使用しなかった場合)

requestsをインストールしていない場合

# 以下を実行し、requestsモジュールをインストール
pip install requests

Pythonコード

import json
import requests

WEBHOOK_URL = "https://hooks.slack.com/services/XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"

print("Start")
print("***************")

response = requests.post(
  WEBHOOK_URL,
  headers={'Content-Type': 'application/json'},
  data=json.dumps({
    'text': 'こんにちは世界',
  })
)
# "ok"って返ってきた
print(response.text)

print("***************")
print("Done")

例2:PythonからSlackへメッセージを送る(公式 SDKを使用する場合)

Pythonコード

from slack_sdk.webhook import WebhookClient

WEBHOOK_URL = "https://hooks.slack.com/services/XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"

print("Start")
print("***************")

client = WebhookClient(WEBHOOK_URL)
response = client.send(text="こんにちは世界です!!")

# 「200」が表示
print(response.status_code)
# 「ok」が表示
print(response.body)

print("***************")
print("Done")

参考文献

https://ponsuke-tarou.hatenablog.com/entry/2019/01/05/171355
https://www.k-hitorigoto.online/entry/2017/07/18/070000

関連記事

Slack ~ Access Token ~
https://dk521123.hatenablog.com/entry/2021/10/13/092235
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323

【AWS】Amazon SNS ~ 基本編 / Email ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/06/03/175213

の続きで、Amazon Simple Notification Service(SNS)について。

https://dk521123.hatenablog.com/entry/2021/10/06/141323

で、Airflow において、
 Amazon SNS経由でEmailを受信することになったので
メモしておく

目次

【1】Amazon SES との違い
【2】構築手順
 1)トピックを作成する
 2)Email用のサブスクリプションを作成する
 3)Email受信を許可する
 4)動作確認
【3】サンプル
 例1:AirflowでSNS経由でEmailを受信する

【1】Amazon SES との違い

* SES は、フルマネージド側Emailサーバ的なサービス(送受信)
* SNS は、Emailを含めたユーザへの通知を重きをおいた
 フルマネージド側サービス

比較表

* それ以外の違いは、以下の比較表を参照のこと。
 => 使ってみると、結構、違いがあるので、
  サービス採用する際に知っておいた方がいいかも。
 => 細かいことをやりたい場合は、SESを選択しておいた方がいいかも。
 => Amazon SES については、以下の関連記事を参照とのこと

https://dk521123.hatenablog.com/entry/2017/04/28/234103

# 項目 SNS SES 備考
1 HTMLメール 不可 可能 SNSはTextのみ
2 添付ファイル 不可 可能
3 Fromメールアドレスの事前登録 不要 必要 SNSはno-reply@sns.amazonaws.comから送られてくる

参考文献
https://qiita.com/shiro01/items/94aad0210917422d105f

* 現状、Amazon SESは、東京リージョンも対応されている模様
 => 以下の公式サイトを参照のこと。

https://aws.amazon.com/jp/blogs/news/amazon-ses-tokyo/

【2】構築手順

* Amazon SNSでEmailを設定する手順は以下の通り。

1)トピックを作成する
2)Email用のサブスクリプションを作成する
3)Email受信を許可する

1)トピックを作成する

[1] AWS管理コンソールでSNSページを表示する
[2] 「トピックの作成」ボタン押下
[3] 以下を選択し、「トピックの作成」ボタン押下
(特に指定しない項目は、デフォルトのまま)

 + タイプ:スタンダード
 + 名前:任意(今回は「hello-world-sns-demo」)
 + 表示名:任意(今回は「hello-world-sns-demo」)

2)Email用のサブスクリプションを作成する

「1)トピックを作成する」の流れで、、、

[1] 「サブスクリプションの作成」ボタン押下
[2] 以下を選択し、「サブスクリプションの作成」ボタン押下
(特に指定しない項目は、デフォルトのまま)
 + プロトコル:Eメール
 + エンドポイント:対象のEmailアドレス
 => 「Amazon SNSから通知を受信できるメールアドレス」
  って表示されているのでわかると思う

3)Email受信を許可する

 「2)Email用のサブスクリプションを作成する」を行うと
エンドポイントに指定されたEmailアドレスに以下のメールが届く。

 + タイトル:AWS Notification - Subscription Confirmation
 + From:{Topic name}<no-reply@sns.amazonaws.com>
({Topic name}は、今回は「hello-world-sns-demo」)

そのメール内のリンク「Confirm subscription」を押下する
 => これで、SNSでメール受信することを承諾したことになる。

4)動作確認

「3)Email受信を許可する」の流れで、、、

[1] 作成したトピックのページ内にある「メッセージの発行」ボタン押下
[2] 以下を入力し、「メッセージの発行」ボタン押下
 + 件名:任意のASCII文字(今回「Hello World」) ... ※
 + エンドポイントに送信するメッセージ本文:任意(今回「こんにちは世界」)

 => 設定したアドレスにメールが届いているはず。

※ 使用上の注意

* 件名に日本語は入力できない(入力したらエラー表示)
 => ただ、API経由なら使えるっぽい

【3】サンプル

例1:AirflowでSNS経由でEmailを受信する

* 以下の関連記事を参照のこと。

Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000

関連記事

Amazon SNS ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/06/03/175213
Amazon SES ~ Emailサービス ~
https://dk521123.hatenablog.com/entry/2017/04/28/234103
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000