■ はじめに
https://dk521123.hatenablog.com/entry/2021/10/06/141323
の続き。 通知に関するサンプルをまとめておく。
目次
例1:SlackAPIPostOperatorを使ってSlackへ通知 1)前提条件 2)サンプル 例2:SlackWebhookOperatorを使ってSlackへ通知 1)前提条件 2)サンプル 例3:AWS SNS で通知する 1)前提条件 2)サンプル
例1:SlackAPIPostOperatorを使ってSlackへ通知
* SlackAPIPostOperatorについては、以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2021/10/06/141323
1)前提条件
* 以下は、接続情報の保持として「Variable」を使用している => Variableの設定方法など詳細は、以下の関連記事を参照のこと。
Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
2)サンプル
import os from airflow import DAG from airflow.utils.dates import days_ago from airflow.models import Variable from airflow.operators.python import PythonOperator from airflow.operators.slack_operator import SlackAPIPostOperator def say_hello(): print('hello world') # raise Exception("Test from say_hello") def say_hi(): print('Hi, World') # raise Exception("Test from say_hi") def send_slack_for_failure(context): ''' Callback for failure ''' print("fire send_slack_for_failure") # try-catch しとかないとエラーの原因が分からないかも、、、 try: slack_failure = SlackAPIPostOperator( task_id='slack_failure_task', token=Variable.get("your_slack_api_token"), text=str(context['exception']), channel='#your-room-name', username='admin-user') return slack_failure.execute(context=context) except Exception as ex: print("[Error] " + str(ex)) raise ex def send_slack_for_success(context): ''' Callback for success ''' print("fire send_slack_for_success") # try-catch しとかないとエラーの原因が分からないかも、、、 try: slack_success = SlackAPIPostOperator( task_id='slack_success_task', token=Variable.get("your-access-token"), text="Your workflow is successful!!", channel='#your-room-name', username='admin-user') return slack_success.execute(context=context) except Exception as ex: print("[Error] " + str(ex)) raise ex default_args = { "start_date": days_ago(2), "provide_context": True, "on_failure_callback": send_slack_for_failure } with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), default_args=default_args, description='This is a simple demo DAG for Hello World', schedule_interval=None, tags=['slack_test'], ) as dag: job1 = PythonOperator( task_id='say_hello', python_callable=say_hello, dag=dag ) job2 = PythonOperator( task_id='say_hi', python_callable=say_hi, on_success_callback=send_slack_for_success, dag=dag ) job1 >> job2
例2:SlackWebhookOperatorを使ってSlackへ通知
* SlackWebhookOperator については、以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2021/10/06/141323
1)前提条件
* Slack の Incoming Webhooks の設定を行う => 以下の関連記事を参照のこと。
Slack ~ Incoming Webhooks ~
https://dk521123.hatenablog.com/entry/2021/10/15/091842
* Connection の設定(「http_conn_id='slack_webhook_demo'」を参照) => 以下の関連記事を参照のこと。
Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
2)サンプル
import os from airflow import DAG from airflow.utils.dates import days_ago from airflow.models import Variable from airflow.operators.python import PythonOperator from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator def say_hello(): print('hello world') # raise Exception("Test from say_hello") def say_hi(): print('Hi, World') # raise Exception("Test from say_hi") def send_slack_for_failure(context): ''' Callback for failure ''' print("fire send_slack_for_failure") # try-catch しとかないとエラーの原因が分からないかも、、、 try: slack_failure = SlackWebhookOperator( task_id='slack_task_for_failure', http_conn_id='slack_webhook_demo', icon_emoji=':cat:', message='*Hello, world!* \nThis is a message for failure' ) return slack_failure.execute(context=context) except Exception as ex: print("[Error] " + str(ex)) raise ex def send_slack_for_success(context): ''' Callback for success ''' print("fire send_slack_for_success") # try-catch しとかないとエラーの原因が分からないかも、、、 try: slack_success = SlackWebhookOperator( task_id='slack_task_for_success', http_conn_id='slack_webhook_demo', icon_emoji=':dog:', message='*Hello, world!* \nThis is a message for success' ) return slack_success.execute(context=context) except Exception as ex: print("[Error] " + str(ex)) raise ex default_args = { "start_date": days_ago(2), "provide_context": True, "on_failure_callback": send_slack_for_failure } with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), default_args=default_args, description='This is a simple demo DAG for Hello World', schedule_interval=None, tags=['slack_test'], ) as dag: job1 = PythonOperator( task_id='say_hello', python_callable=say_hello, dag=dag ) job2 = PythonOperator( task_id='say_hi', python_callable=say_hi, on_success_callback=send_slack_for_success, dag=dag ) job1 >> job2
例3:AWS SNS で通知する
* SnsPublishOperator については、以下の関連記事を参照のこと。
https://dk521123.hatenablog.com/entry/2021/10/06/141323
1)前提条件
* SNSの設定をしておく => 詳細は以下の関連記事を参照のこと。
Amazon SNS ~ 基本編 / Email ~
https://dk521123.hatenablog.com/entry/2021/10/14/092313
2)サンプル
import os from airflow import DAG from airflow.utils.dates import days_ago from airflow.models import Variable from airflow.operators.python import PythonOperator from airflow.contrib.operators.sns_publish_operator import SnsPublishOperator def say_hello(): print('hello world') # raise Exception("Test from say_hello") def say_hi(): print('Hi, World') # raise Exception("Test from say_hi") def send_email_for_failure(context): ''' Callback for failure ''' print("Fire send_email_for_failure") try: email_failure = SnsPublishOperator( task_id='publish_sns_failure_task', target_arn='arn:aws:sns:us-west-2-xxxxxxx', subject='This is a title for failure', message='Hello World for failure') return email_failure.execute(context=context) except Exception as ex: print("[Error] " + str(ex)) raise ex def send_email_for_success(context): ''' Callback for success ''' print("Fire send_email_for_success") try: email_success = SnsPublishOperator( task_id='publish_sns_success_task', target_arn='arn:aws:sns:us-west-2-xxxxxxx', subject='This is a title for success', message='Hello World for success' ) return email_success.execute(context=context) except Exception as ex: print("[Error] " + str(ex)) raise ex default_args = { "start_date": days_ago(2), "provide_context": True, "on_failure_callback": send_email_for_failure } with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), default_args=default_args, description='This is a simple demo DAG for Hello World', schedule_interval=None, tags=['sns_test'], ) as dag: job1 = PythonOperator( task_id='say_hello', python_callable=say_hello, dag=dag ) job2 = PythonOperator( task_id='say_hi', python_callable=say_hi, on_success_callback=send_email_for_success, dag=dag ) job1 >> job2
関連記事
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703
Amazon SES ~ Emailサービス ~
https://dk521123.hatenablog.com/entry/2017/04/28/234103
Amazon SNS ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/06/03/175213
Amazon SNS ~ 基本編 / Email ~
https://dk521123.hatenablog.com/entry/2021/10/14/092313
Slack ~ Access Token ~
https://dk521123.hatenablog.com/entry/2021/10/13/092235
Slack ~ Incoming Webhooks ~
https://dk521123.hatenablog.com/entry/2021/10/15/091842