【Airflow】Apache Airflow ~ 通知あれこれ編 ~

■ はじめに

Apache Airflow で処理の失敗時やワークフロー成功時に
以下のサービスへ通知する方法を調べたので、メモっておく。
~~~~~~~~~~
【1】Slack
【2】Email / AWS SES
【3】AWS SNS
【4】Microsoft Teams
~~~~~~~~~~

目次

【0】全般的な注意点
 1)メッセージ内容が日本語を使用する場合
【1】Slack
 0)注意事項
 1)SlackAPIOperator
 2)SlackAPIPostOperator
 3)SlackAPIFileOperator
 4)SlackWebhookOperator
【2】Email / AWS SES
 0)注意事項
 1)EmailOperator
 2)SESHook
【3】AWS SNS
 0)注意事項
 1)SnsPublishOperator
 2)AwsSnsHook
【4】Microsoft Teams
【5】通知に関する技術事項
 1)実装せずにタスク失敗をEmail通知する
 2)タスク成功・失敗時に通知する
 3)接続情報の保持

なお、サンプルについては、以下の関連記事を参照のこと。

Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000

【0】全般的な注意点

1)メッセージ内容が日本語を使用する場合

* DAGのPythonコードを、
 ファイル形式「UTF-8」で保存すること

 => 日本語のメッセージでテストした際にWeb UI側で
  エラー「SyntaxError: ... 'utf-8' codec can't decode ...」が
  表示されたので、何かなーって原因を探ったら、ファイル形式だった
 => 詳細は、以下の関連記事を参照のこと。

Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000

【1】Slack

* 関連するAPI仕様の詳細は、以下のサイトを参照。

https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/_api/airflow/providers/slack/operators/slack/index.html
https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/slack_webhook_operator/index.html

0)注意事項

* Slack の Access Token を用意する必要がある
 => トークンに関しては、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/13/092235

* WebHook の場合は、 Incoming Webhookを有効にする

https://dk521123.hatenablog.com/entry/2021/10/15/091842

1)SlackAPIOperator

* 以下の1)~2)の親クラス

2)SlackAPIPostOperator

* Slack チェンネルへのPost送信を実行

構文

from airflow.operators.slack_operator import SlackAPIPostOperator

slack = SlackAPIPostOperator(
    task_id="post_hello",
    dag=dag,
    token="XXX",
    text="hello there!",
    channel="#random",
)

3)SlackAPIFileOperator

* Slack チャンネルへのファイル送信を実行

構文

# Send file with filename and filetype
slack = SlackAPIPostOperator(
    task_id="post_hello",
    dag=dag,
    token="XXX",
    text="hello there!",
    channel="#random",
)

4)SlackWebhookOperator

* 関連するAPI仕様の詳細は、以下のサイトを参照。

https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/slack_webhook_operator/index.html

* Webhook については、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/15/091842

* Connection を使用する(「http_conn_id='slack_webhook'」部分)場合は、
 以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/16/000454

構文

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

slack_job = SlackWebhookOperator(
  task_id='slack_task',
  # Connectionで指定する(webhook_tokenでトークンで指定することも可能)
  http_conn_id='slack_webhook',
  message='*Hello, world!* \nThis is a message)',
  username='slack-user-name',
  icon_emoji=':dog:',
  channel='#your-channel-name',
  dag=dag
)

コード
http://airflow.apache.org/docs/apache-airflow/1.10.13/_modules/airflow/contrib/operators/slack_webhook_operator.html

参考文献
https://qiita.com/munaita_/items/ad0f6f9590185741fff6

【2】Email / AWS SES

* 関連するAPI仕様の詳細は、以下のサイトを参照。

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/email/index.html
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/hooks/ses/index.html

* なお、Amazon SES に関する詳細は、以下の関連記事を参照のこと

Amazon SES ~ Emailサービス ~
https://dk521123.hatenablog.com/entry/2017/04/28/234103

0)注意事項

* AirflowにSMTPの接続情報を設定する必要がある

設定方法

[1] airflow.cfgで設定する方法
 => 詳細は、以下のサイトを参照。

https://airflow.apache.org/docs/apache-airflow/stable/howto/email-config.html

[2] 環境変数で指定する方法
 => 詳細は、以下のサイトを参照。

https://dev.classmethod.jp/articles/apache-airflow-send-mail-using-amazon-ses/

* 設定値については、以下の公式ドキュメントを参照。

設定方法 / AWS MWAA

* MWAA(Amazon Managed Workflow for Apache Airflow)での設定は
 以下のサイトを参照のこと(基本的にやることは、前述と変わらない)

https://qiita.com/pioho07/items/58c97bbfa8163f713d80

Email
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html?highlight=email#email
SMTP (Simple Mail Transfer Protocol)
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#smtp

1)EmailOperator

* Email送信する
* 添付ファイルも引数「files (list)」で送れそう

構文

from airflow.operators.email import EmailOperator

send_mail = EmailOperator(
  task_id="sendmail",
  to="to_address@xxxx.com",
  cc="cc_address@xxxx.com",
  bcc="bcc_address@xxxx.com",
  subject="This is a title",
  html_content="<b>Hello World</b>",
  mime_charset="utf-8",
)

2)SESHook

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/sns/index.html

* SES(Amazon Simple Email Service) のフック関数
 => 使うことあるか分からんが、、、

参考文献
https://dev.classmethod.jp/articles/apache-airflow-send-mail-using-amazon-ses/
https://qiita.com/pioho07/items/58c97bbfa8163f713d80
https://bhavaniravi.com/blog/sending-emails-from-airflow/

【3】AWS SNS

* 関連するAPI仕様の詳細は、以下のサイトを参照。

https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/sns_publish_operator/index.html
https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/hooks/aws_sns_hook/index.html

* 他のサービスと比較すると、あまり参考になるサイトがない印象。
 => 以下のサイトくらい?

https://www.mikulskibartosz.name/send-sms-from-airflow-using-aws-sns/

* なお、Amazon SNS に関する詳細は、以下の関連記事を参照のこと

Amazon SNS ~ 通知サービス ~
https://dk521123.hatenablog.com/entry/2021/06/03/175213

0)注意事項

* 以下を行う必要がある
 [1] AWS SNSで、トピックとサブスクリプションを追加

1)SnsPublishOperator

* Amazon SNSへメッセージをPublishする

https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/sns_publish_operator/index.html

構文

from airflow.contrib.operators.sns_publish_operator import SnsPublishOperator

publish_sns = SnsPublishOperator(
  task_id='publish_sns_task',
  aws_conn_id='your_sns_connection_id',
  target_arn='your_sns_topic_arn',
  subject='This is a title',
  message='Hello World'
)

コード

* 内部で「2)AwsSnsHook」を呼び出している

https://airflow.apache.org/docs/apache-airflow/1.10.12/_modules/airflow/contrib/operators/sns_publish_operator.html

補足:引数「aws_conn_id」「target_arn」

* aws_conn_id : aws connection to use
 => 使用するAWSコネクション??

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html

 => 以下のサイトが参考になりそう。

https://dev.classmethod.jp/articles/apache-airflow-aws-conn-assume-role/

* target_arn : either a TopicArn or an EndpointArn
 => トピックARNかエンドポイントARNのどちらか
 => トピックの ARN「arn:aws:sns:<RegionName>-xxxxxxx」
  を指定すればよさそう

2)AwsSnsHook

* SNS への フック関数

https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/hooks/aws_sns_hook/index.html

* コードを参照したところ、boto3 API - public(通知依頼) を使用している模様

コード
http://airflow.apache.org/docs/apache-airflow/1.10.6/_modules/airflow/contrib/hooks/aws_sns_hook.html
boto3 API - public(通知依頼)
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns.html#SNS.Client.publish

【4】Microsoft Teams

* HTTP Post送信でできそう。
 => 詳細は、以下の参考文献を参照。

参考文献
https://qiita.com/whata/items/dba1c2db1a38d9d4a4c0
https://zenn.dev/antyuntyun/articles/airflow_custom_notification

【5】通知に関する技術事項

1)実装せずにタスク失敗をEmail通知する

* Email送信であれば「'email_on_failure': True」って設定がある

https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/index.html

より抜粋
~~~~~~~~~~~~
email_on_failure (bool)
 – Indicates whether email alerts should be sent when a task failed
=> タスク失敗時に、Emailアラートを送るかどうかを示す
~~~~~~~~~~~~

Pythonコード・一部抜粋

default_args = {
  'email': ['xxxxx@gmail.com'],
  'email_on_failure': True,
  'email_on_retry': True,
}

使用上の注意

* 「【2】Email / AWS SES」の「0)注意事項」で記載しているのと同様に
 「AirflowにSMTPの接続情報を設定する必要がある」。
 => 何も設定せずに行うと、ログに以下のエラーが表示されてしまう。

~~~~~~~~
ConnectionRefusedError: [Errno 111] Connection refused
~~~~~~~~

https://github.com/puckel/docker-airflow/issues/425

2)タスク成功・失敗時に通知する

* コールバックが用意されているので
 失敗時:on_failure_callback
 成功時:on_success_callback

* default_argsに設定すればDAG内の全タスクに適応可能
 => 失敗時は、default_argsに設定し、
  成功時は、最後のタスクに設定すればよさそう

実装イメージ

def send_for_failure(context):
  try:
    # 例外内容
    exception_message = str(context['exception'])
    # タスク インスタンス(※1参照)
    task_instance = context.get('task_instance')
    # Task ID
    task_id = task_instance.task_id
    # DAG ID
    task_id = task_instance.dag_id
    # Execution Date
    task_id = task_instance.execution_date

    # 以降は、送信する処理(省略)
  except Exception as ex:
    print("Error " + str(ex))
    raise ex

※1:タスク インスタンス

* タスク インスタンスの詳細は、以下の公式ドキュメントを参照。

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/taskinstance/index.html

参考文献
https://qiita.com/munaita_/items/1a5b131839e01ea7280d
https://blog.imind.jp/entry/2019/02/08/170332
https://medium.com/@momota/airflow-dag%E3%81%AB%E3%81%8A%E3%81%91%E3%82%8B%E5%90%84%E3%82%BF%E3%82%B9%E3%82%AF%E3%81%AE%E6%88%90%E5%8A%9For%E5%A4%B1%E6%95%97%E3%82%92slack%E3%81%AB%E9%80%9A%E7%9F%A5%E3%81%99%E3%82%8B-5eecda9ec378

3)接続情報の保持

* Tokenなどの接続情報の保持は、「Variable」を使用していると良さそう
 => Variableの設定方法など詳細は、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/09/30/163020

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703
Amazon SES ~ Emailサービス ~
https://dk521123.hatenablog.com/entry/2017/04/28/234103
Amazon SNS ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/06/03/175213
Amazon SNS ~ 基本編 / Email ~
https://dk521123.hatenablog.com/entry/2021/10/14/092313
Slack ~ Access Token ~
https://dk521123.hatenablog.com/entry/2021/10/13/092235
Slack ~ Incoming Webhooks ~
https://dk521123.hatenablog.com/entry/2021/10/15/091842