【Airflow】Apache Airflow ~ 通知サンプル編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/10/06/141323

の続き。

通知に関するサンプルをまとめておく。

目次

例1:SlackAPIPostOperatorを使ってSlackへ通知
 1)前提条件
 2)サンプル
例2:SlackWebhookOperatorを使ってSlackへ通知
 1)前提条件
 2)サンプル
例3:AWS SNS で通知する
 1)前提条件
 2)サンプル

例1:SlackAPIPostOperatorを使ってSlackへ通知

* SlackAPIPostOperatorについては、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/06/141323

1)前提条件

* 以下は、接続情報の保持として「Variable」を使用している
 => Variableの設定方法など詳細は、以下の関連記事を参照のこと。

Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454

2)サンプル

import os

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.operators.slack_operator import SlackAPIPostOperator


def say_hello():
  print('hello world')
  # raise Exception("Test from say_hello")

def say_hi():
  print('Hi, World')
  # raise Exception("Test from say_hi")

def send_slack_for_failure(context):
  ''' Callback for failure
  '''
  print("fire send_slack_for_failure")
  # try-catch しとかないとエラーの原因が分からないかも、、、
  try:
    slack_failure = SlackAPIPostOperator(
      task_id='slack_failure_task',
      token=Variable.get("your_slack_api_token"),
      text=str(context['exception']),
      channel='#your-room-name',
      username='admin-user')
    return slack_failure.execute(context=context)
  except Exception as ex:
    print("[Error] " + str(ex))
    raise ex

def send_slack_for_success(context):
  ''' Callback for success
  '''
  print("fire send_slack_for_success")
  # try-catch しとかないとエラーの原因が分からないかも、、、
  try:
    slack_success = SlackAPIPostOperator(
      task_id='slack_success_task',
      token=Variable.get("your-access-token"),
      text="Your workflow is successful!!",
      channel='#your-room-name',
      username='admin-user')
    return slack_success.execute(context=context)
  except Exception as ex:
    print("[Error] " + str(ex))
    raise ex

default_args = {
  "start_date": days_ago(2),
  "provide_context": True,
  "on_failure_callback": send_slack_for_failure
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  default_args=default_args,
  description='This is a simple demo DAG for Hello World',
  schedule_interval=None,
  tags=['slack_test'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello',
    python_callable=say_hello,
    dag=dag
  )
  job2 = PythonOperator(
    task_id='say_hi',
    python_callable=say_hi,
    on_success_callback=send_slack_for_success,
    dag=dag
  )
  job1 >> job2

例2:SlackWebhookOperatorを使ってSlackへ通知

* SlackWebhookOperator については、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/06/141323

1)前提条件

* Slack の Incoming Webhooks の設定を行う
 => 以下の関連記事を参照のこと。

Slack ~ Incoming Webhooks ~
https://dk521123.hatenablog.com/entry/2021/10/15/091842

* Connection の設定(「http_conn_id='slack_webhook_demo'」を参照)
 => 以下の関連記事を参照のこと。

Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454

2)サンプル

import os

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator


def say_hello():
  print('hello world')
  # raise Exception("Test from say_hello")

def say_hi():
  print('Hi, World')
  # raise Exception("Test from say_hi")

def send_slack_for_failure(context):
  ''' Callback for failure
  '''
  print("fire send_slack_for_failure")
  # try-catch しとかないとエラーの原因が分からないかも、、、
  try:
    slack_failure = SlackWebhookOperator(
      task_id='slack_task_for_failure',
      http_conn_id='slack_webhook_demo',
      icon_emoji=':cat:',
      message='*Hello, world!* \nThis is a message for failure'
    )
    return slack_failure.execute(context=context)
  except Exception as ex:
    print("[Error] " + str(ex))
    raise ex

def send_slack_for_success(context):
  ''' Callback for success
  '''
  print("fire send_slack_for_success")
  # try-catch しとかないとエラーの原因が分からないかも、、、
  try:
    slack_success = SlackWebhookOperator(
      task_id='slack_task_for_success',
      http_conn_id='slack_webhook_demo',
      icon_emoji=':dog:',
      message='*Hello, world!* \nThis is a message for success'
    )
    return slack_success.execute(context=context)
  except Exception as ex:
    print("[Error] " + str(ex))
    raise ex

default_args = {
  "start_date": days_ago(2),
  "provide_context": True,
  "on_failure_callback": send_slack_for_failure
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  default_args=default_args,
  description='This is a simple demo DAG for Hello World',
  schedule_interval=None,
  tags=['slack_test'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello',
    python_callable=say_hello,
    dag=dag
  )
  job2 = PythonOperator(
    task_id='say_hi',
    python_callable=say_hi,
    on_success_callback=send_slack_for_success,
    dag=dag
  )
  job1 >> job2

例3:AWS SNS で通知する

* SnsPublishOperator については、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/06/141323

1)前提条件

* SNSの設定をしておく
 => 詳細は以下の関連記事を参照のこと。

Amazon SNS ~ 基本編 / Email ~
https://dk521123.hatenablog.com/entry/2021/10/14/092313

2)サンプル

import os

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.contrib.operators.sns_publish_operator import SnsPublishOperator


def say_hello():
  print('hello world')
  # raise Exception("Test from say_hello")

def say_hi():
  print('Hi, World')
  # raise Exception("Test from say_hi")

def send_email_for_failure(context):
  ''' Callback for failure
  '''
  print("Fire send_email_for_failure")
  try:
    email_failure = SnsPublishOperator(
      task_id='publish_sns_failure_task',
      target_arn='arn:aws:sns:us-west-2-xxxxxxx',
      subject='This is a title for failure',
      message='Hello World for failure')
    return email_failure.execute(context=context)
  except Exception as ex:
    print("[Error] " + str(ex))
    raise ex

def send_email_for_success(context):
  ''' Callback for success
  '''
  print("Fire send_email_for_success")
  try:
    email_success = SnsPublishOperator(
      task_id='publish_sns_success_task',
      target_arn='arn:aws:sns:us-west-2-xxxxxxx',
      subject='This is a title for success',
      message='Hello World for success'
    )
    return email_success.execute(context=context)
  except Exception as ex:
    print("[Error] " + str(ex))
    raise ex

default_args = {
  "start_date": days_ago(2),
  "provide_context": True,
  "on_failure_callback": send_email_for_failure
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  default_args=default_args,
  description='This is a simple demo DAG for Hello World',
  schedule_interval=None,
  tags=['sns_test'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello',
    python_callable=say_hello,
    dag=dag
  )
  job2 = PythonOperator(
    task_id='say_hi',
    python_callable=say_hi,
    on_success_callback=send_email_for_success,
    dag=dag
  )
  job1 >> job2

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703
Amazon SES ~ Emailサービス ~
https://dk521123.hatenablog.com/entry/2017/04/28/234103
Amazon SNS ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/06/03/175213
Amazon SNS ~ 基本編 / Email ~
https://dk521123.hatenablog.com/entry/2021/10/14/092313
Slack ~ Access Token ~
https://dk521123.hatenablog.com/entry/2021/10/13/092235
Slack ~ Incoming Webhooks ~
https://dk521123.hatenablog.com/entry/2021/10/15/091842