■ はじめに
Apache Airflow で処理の失敗時やワークフロー成功時に 以下のサービスへ通知する方法を調べたので、メモっておく。 ~~~~~~~~~~ 【1】Slack 【2】Email / AWS SES 【3】AWS SNS 【4】Microsoft Teams ~~~~~~~~~~
目次
【0】全般的な注意点 1)メッセージ内容が日本語を使用する場合 【1】Slack 0)注意事項 1)SlackAPIOperator 2)SlackAPIPostOperator 3)SlackAPIFileOperator 4)SlackWebhookOperator 【2】Email / AWS SES 0)注意事項 1)EmailOperator 2)SESHook 【3】AWS SNS 0)注意事項 1)SnsPublishOperator 2)AwsSnsHook 【4】Microsoft Teams 【5】通知に関する技術事項 1)実装せずにタスク失敗をEmail通知する 2)タスク成功・失敗時に通知する 3)接続情報の保持 なお、サンプルについては、以下の関連記事を参照のこと。
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
【0】全般的な注意点
1)メッセージ内容が日本語を使用する場合
* DAGのPythonコードを、 ファイル形式「UTF-8」で保存すること => 日本語のメッセージでテストした際にWeb UI側で エラー「SyntaxError: ... 'utf-8' codec can't decode ...」が 表示されたので、何かなーって原因を探ったら、ファイル形式だった => 詳細は、以下の関連記事を参照のこと。
Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000
【1】Slack
* 関連するAPI仕様の詳細は、以下のサイトを参照。
https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/_api/airflow/providers/slack/operators/slack/index.html
https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/slack_webhook_operator/index.html
0)注意事項
* Slack の Access Token を用意する必要がある => トークンに関しては、以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2021/10/13/092235
* WebHook の場合は、 Incoming Webhookを有効にする
https://dk521123.hatenablog.com/entry/2021/10/15/091842
1)SlackAPIOperator
* 以下の1)~2)の親クラス
2)SlackAPIPostOperator
* Slack チェンネルへのPost送信を実行
構文
from airflow.operators.slack_operator import SlackAPIPostOperator slack = SlackAPIPostOperator( task_id="post_hello", dag=dag, token="XXX", text="hello there!", channel="#random", )
3)SlackAPIFileOperator
* Slack チャンネルへのファイル送信を実行
構文
# Send file with filename and filetype slack = SlackAPIPostOperator( task_id="post_hello", dag=dag, token="XXX", text="hello there!", channel="#random", )
4)SlackWebhookOperator
* 関連するAPI仕様の詳細は、以下のサイトを参照。
* Webhook については、以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2021/10/15/091842
* Connection を使用する(「http_conn_id='slack_webhook'」部分)場合は、 以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2021/10/16/000454
構文
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator slack_job = SlackWebhookOperator( task_id='slack_task', # Connectionで指定する(webhook_tokenでトークンで指定することも可能) http_conn_id='slack_webhook', message='*Hello, world!* \nThis is a message)', username='slack-user-name', icon_emoji=':dog:', channel='#your-channel-name', dag=dag )
参考文献
https://qiita.com/munaita_/items/ad0f6f9590185741fff6
【2】Email / AWS SES
* 関連するAPI仕様の詳細は、以下のサイトを参照。
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/email/index.html
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/hooks/ses/index.html
* なお、Amazon SES に関する詳細は、以下の関連記事を参照のこと
Amazon SES ~ Emailサービス ~
https://dk521123.hatenablog.com/entry/2017/04/28/234103
0)注意事項
* AirflowにSMTPの接続情報を設定する必要がある
設定方法
[1] airflow.cfgで設定する方法 => 詳細は、以下のサイトを参照。
https://airflow.apache.org/docs/apache-airflow/stable/howto/email-config.html
[2] 環境変数で指定する方法 => 詳細は、以下のサイトを参照。
https://dev.classmethod.jp/articles/apache-airflow-send-mail-using-amazon-ses/
* 設定値については、以下の公式ドキュメントを参照。
設定方法 / AWS MWAA
* MWAA(Amazon Managed Workflow for Apache Airflow)での設定は 以下のサイトを参照のこと(基本的にやることは、前述と変わらない)
https://qiita.com/pioho07/items/58c97bbfa8163f713d80
Email
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html?highlight=email#email
SMTP (Simple Mail Transfer Protocol)
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#smtp
1)EmailOperator
* Email送信する * 添付ファイルも引数「files (list)」で送れそう
構文
from airflow.operators.email import EmailOperator send_mail = EmailOperator( task_id="sendmail", to="to_address@xxxx.com", cc="cc_address@xxxx.com", bcc="bcc_address@xxxx.com", subject="This is a title", html_content="<b>Hello World</b>", mime_charset="utf-8", )
2)SESHook
* SES(Amazon Simple Email Service) のフック関数 => 使うことあるか分からんが、、、
参考文献
https://dev.classmethod.jp/articles/apache-airflow-send-mail-using-amazon-ses/
https://qiita.com/pioho07/items/58c97bbfa8163f713d80
https://bhavaniravi.com/blog/sending-emails-from-airflow/
【3】AWS SNS
* 関連するAPI仕様の詳細は、以下のサイトを参照。
https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/sns_publish_operator/index.html
https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/hooks/aws_sns_hook/index.html
* 他のサービスと比較すると、あまり参考になるサイトがない印象。 => 以下のサイトくらい?
https://www.mikulskibartosz.name/send-sms-from-airflow-using-aws-sns/
* なお、Amazon SNS に関する詳細は、以下の関連記事を参照のこと
Amazon SNS ~ 通知サービス ~
https://dk521123.hatenablog.com/entry/2021/06/03/175213
0)注意事項
* 以下を行う必要がある [1] AWS SNSで、トピックとサブスクリプションを追加
1)SnsPublishOperator
* Amazon SNSへメッセージをPublishする
構文
from airflow.contrib.operators.sns_publish_operator import SnsPublishOperator publish_sns = SnsPublishOperator( task_id='publish_sns_task', aws_conn_id='your_sns_connection_id', target_arn='your_sns_topic_arn', subject='This is a title', message='Hello World' )
コード
* 内部で「2)AwsSnsHook」を呼び出している
補足:引数「aws_conn_id」「target_arn」
* aws_conn_id : aws connection to use => 使用するAWSコネクション??
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html
=> 以下のサイトが参考になりそう。
https://dev.classmethod.jp/articles/apache-airflow-aws-conn-assume-role/
* target_arn : either a TopicArn or an EndpointArn => トピックARNかエンドポイントARNのどちらか => トピックの ARN「arn:aws:sns:<RegionName>-xxxxxxx」 を指定すればよさそう
2)AwsSnsHook
* SNS への フック関数
* コードを参照したところ、boto3 API - public(通知依頼) を使用している模様
コード
http://airflow.apache.org/docs/apache-airflow/1.10.6/_modules/airflow/contrib/hooks/aws_sns_hook.html
boto3 API - public(通知依頼)
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns.html#SNS.Client.publish
【4】Microsoft Teams
* HTTP Post送信でできそう。 => 詳細は、以下の参考文献を参照。
参考文献
https://qiita.com/whata/items/dba1c2db1a38d9d4a4c0
https://zenn.dev/antyuntyun/articles/airflow_custom_notification
【5】通知に関する技術事項
1)実装せずにタスク失敗をEmail通知する
* Email送信であれば「'email_on_failure': True」って設定がある
https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/index.html
より抜粋 ~~~~~~~~~~~~ email_on_failure (bool) – Indicates whether email alerts should be sent when a task failed => タスク失敗時に、Emailアラートを送るかどうかを示す ~~~~~~~~~~~~
Pythonコード・一部抜粋
default_args = { 'email': ['xxxxx@gmail.com'], 'email_on_failure': True, 'email_on_retry': True, }
使用上の注意
* 「【2】Email / AWS SES」の「0)注意事項」で記載しているのと同様に 「AirflowにSMTPの接続情報を設定する必要がある」。 => 何も設定せずに行うと、ログに以下のエラーが表示されてしまう。 ~~~~~~~~ ConnectionRefusedError: [Errno 111] Connection refused ~~~~~~~~
https://github.com/puckel/docker-airflow/issues/425
2)タスク成功・失敗時に通知する
* コールバックが用意されているので 失敗時:on_failure_callback 成功時:on_success_callback * default_argsに設定すればDAG内の全タスクに適応可能 => 失敗時は、default_argsに設定し、 成功時は、最後のタスクに設定すればよさそう
実装イメージ
def send_for_failure(context): try: # 例外内容 exception_message = str(context['exception']) # タスク インスタンス(※1参照) task_instance = context.get('task_instance') # Task ID task_id = task_instance.task_id # DAG ID task_id = task_instance.dag_id # Execution Date task_id = task_instance.execution_date # 以降は、送信する処理(省略) except Exception as ex: print("Error " + str(ex)) raise ex
※1:タスク インスタンス
* タスク インスタンスの詳細は、以下の公式ドキュメントを参照。
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/taskinstance/index.html
参考文献
https://qiita.com/munaita_/items/1a5b131839e01ea7280d
https://blog.imind.jp/entry/2019/02/08/170332
https://medium.com/@momota/airflow-dag%E3%81%AB%E3%81%8A%E3%81%91%E3%82%8B%E5%90%84%E3%82%BF%E3%82%B9%E3%82%AF%E3%81%AE%E6%88%90%E5%8A%9For%E5%A4%B1%E6%95%97%E3%82%92slack%E3%81%AB%E9%80%9A%E7%9F%A5%E3%81%99%E3%82%8B-5eecda9ec378
3)接続情報の保持
* Tokenなどの接続情報の保持は、「Variable」を使用していると良さそう => Variableの設定方法など詳細は、以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2021/09/30/163020
関連記事
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703
Amazon SES ~ Emailサービス ~
https://dk521123.hatenablog.com/entry/2017/04/28/234103
Amazon SNS ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/06/03/175213
Amazon SNS ~ 基本編 / Email ~
https://dk521123.hatenablog.com/entry/2021/10/14/092313
Slack ~ Access Token ~
https://dk521123.hatenablog.com/entry/2021/10/13/092235
Slack ~ Incoming Webhooks ~
https://dk521123.hatenablog.com/entry/2021/10/15/091842