【Airflow】Apache Airflow ~ リトライ ~

■ はじめに

Apache Airflow の リトライ について扱う。

目次

【1】手動でリトライ - 再実行
【2】airflow.cfgでの制御 - Airflow全体の設定
 1)default_task_retries
 2)max_db_retries
【3】タスクのリトライ関連のプロパティ
 1)retries
 2)retry_delay
 3)retry_exponential_backoff
 4)max_retry_delay
 5)on_retry_callback
 6)使用イメージ
【4】サンプル
 例1:実験コード

【1】手動でリトライ - 再実行

* Web UI や コマンド を通して、可能
 => 途中のタスクから実行など細かく再実行できる
 => 詳細は、以下のサイトを参照。

https://future-architect.github.io/articles/20200131/#2-%E3%83%AA%E3%83%88%E3%83%A9%E3%82%A4
https://blog.imind.jp/entry/2019/02/22/000049

【2】airflow.cfgでの制御 - Airflow全体の設定

1)default_task_retries
2)max_db_retries

1)default_task_retries

* デフォルトのタスクのリトライ回数

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#default-task-retries

New in version 1.10.6.

The number of retries each task is going to have by default.
Can be overridden at dag or task level.

2)max_db_retries

* v2.0.0から
* DB Operational Errorsの場合のリトライ回数

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-db-retries

New in version 2.0.0.

Number of times the code should be retried in case of DB Operational Errors.
Not all transactions will be retried as it can cause undesired state.
Currently it is only used in DagFileProcessor.process_file to retry dagbag.sync_to_db.

【3】タスクのリトライ関連のプロパティ

* タスクのリトライに関して、関連する設定は、以下の通り。

1)retries
2)retry_delay
3)retry_exponential_backoff
4)max_retry_delay
5)on_retry_callback

https://airflow.apache.org/docs/apache-airflow/1.10.3/_api/airflow/operators/index.html#package-contents

1)retries

* リトライ回数

API仕様より

retries (int)
 – the number of retries that should be performed
  before failing the task
 – タスクが失敗になる前に、実行されるリトライ回数

2)retry_delay

* リトライ時の遅延時間

API仕様より

retry_delay (datetime.timedelta)
 – delay between retries
 – リトライ間の遅延

3)retry_exponential_backoff

* 指数関数的後退アルゴリズム(※ 補足1参照)を使った
 リトライを行うかどうか

API仕様より

retry_exponential_backoff (bool)
 – allow progressive longer waits between retries
  by using exponential backoff algorithm on retry delay 
 (delay will be converted into seconds)
 – リトライ遅延での指数関数的後退アルゴリズムによる
 リトライ間隔の待ち時間を増加させるかどうか

※ 補足1:指数関数的後退アルゴリズム / exponential backoff algorithm

* 指数関数的後退アルゴリズム
 => リトライ回数が増えるごとに待ち時間を指数関数的に増やすアルゴリズム
 => 例:0.5秒、1秒、2秒、4秒、8秒, ... <= 固定じゃない
* 詳細は、以下のサイトを参照。

https://codezine.jp/article/detail/10739
https://note.com/artrigger_jp/n/n0795148b062d
https://yoshidashingo.hatenablog.com/entry/2014/08/17/135017

4)max_retry_delay

* リトライ間の最大遅延間隔(「2)retry_delay」とどう違う?)

API仕様より

max_retry_delay (datetime.timedelta)
 – maximum delay interval between retries
 – リトライ間の最大遅延間隔

5)on_retry_callback

* リトライ時のコールバック関数

API仕様より

on_retry_callback (callable)
 – much like the on_failure_callback
  except that it is executed when retries occur.
 – on_failure_callbackのような リトライ発生時のコールバック関数

* on_failure_callback については、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/06/141323

6)使用イメージ

* 「デフォルトでの設定」と「タスク個別に設定」することが可能

イメージ

from datetime import timedelta

# デフォルトでの設定例
default_args = {
  "provide_context": True,
  "retries": 1,
  "retry_exponential_backoff ": True,
  "retry_delay": timedelta(seconds=10)
}

# オペレータ個別による設定例
task1 = PythonOperator(
    task_id='task1',
    python_callable=say_hello,
    retries=3,
    retry_exponential_backoff=False,
    dag=dag
)

【4】サンプル

例1:実験コード

import os
import time
import datetime
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator


def say_hello(**context):
  print(f"こんにちは世界。10秒寝ます Zzz ... {datetime.datetime.now()}")
  time.sleep(10)
  print(f"Done ... {datetime.datetime.now()}")

def failed_on_purpose(**context):
  print(f"これからわざとエラーにします ... {datetime.datetime.now()}")
  raise Exception("Error...on purpose...")

def call_back_for_retry(**context):
  print(f"コールバック関数がコールされた ... {datetime.datetime.now()}")

defalut_args = {
  "start_date": days_ago(2),
  "provide_context": True,
  "retries": 5,
  "retry_exponential_backoff ": True,
  "retry_delay": timedelta(seconds=10),
  "max_retry_delay": timedelta(seconds=25),
  "on_retry_callback": call_back_for_retry
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  description='This is a simple demo.',
  default_args=defalut_args,
  schedule_interval=None,
  tags=['hello_world'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello_task1',
    dag=dag,
    python_callable=say_hello,
  )
  job2 = PythonOperator(
    task_id='say_hello_task2',
    dag=dag,
    # ★エラーを起こすメソッドを設定
    python_callable=failed_on_purpose,
    # retries=3,
    # retry_exponential_backoff=False
  )
  job1 >> job2

参考文献

https://qiita.com/munaita_/items/7fe474369a190d8d4ee6

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ タイムアウト
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703

【Airflow】Apache Airflow ~ 通知サンプル編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/10/06/141323

の続き。

通知に関するサンプルをまとめておく。

目次

例1:SlackAPIPostOperatorを使ってSlackへ通知
 1)前提条件
 2)サンプル
例2:SlackWebhookOperatorを使ってSlackへ通知
 1)前提条件
 2)サンプル
例3:AWS SNS で通知する
 1)前提条件
 2)サンプル

例1:SlackAPIPostOperatorを使ってSlackへ通知

* SlackAPIPostOperatorについては、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/06/141323

1)前提条件

* 以下は、接続情報の保持として「Variable」を使用している
 => Variableの設定方法など詳細は、以下の関連記事を参照のこと。

Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454

2)サンプル

import os

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.operators.slack_operator import SlackAPIPostOperator


def say_hello():
  print('hello world')
  # raise Exception("Test from say_hello")

def say_hi():
  print('Hi, World')
  # raise Exception("Test from say_hi")

def send_slack_for_failure(context):
  ''' Callback for failure
  '''
  print("fire send_slack_for_failure")
  # try-catch しとかないとエラーの原因が分からないかも、、、
  try:
    slack_failure = SlackAPIPostOperator(
      task_id='slack_failure_task',
      token=Variable.get("your_slack_api_token"),
      text=str(context['exception']),
      channel='#your-room-name',
      username='admin-user')
    return slack_failure.execute(context=context)
  except Exception as ex:
    print("[Error] " + str(ex))
    raise ex

def send_slack_for_success(context):
  ''' Callback for success
  '''
  print("fire send_slack_for_success")
  # try-catch しとかないとエラーの原因が分からないかも、、、
  try:
    slack_success = SlackAPIPostOperator(
      task_id='slack_success_task',
      token=Variable.get("your-access-token"),
      text="Your workflow is successful!!",
      channel='#your-room-name',
      username='admin-user')
    return slack_success.execute(context=context)
  except Exception as ex:
    print("[Error] " + str(ex))
    raise ex

default_args = {
  "start_date": days_ago(2),
  "provide_context": True,
  "on_failure_callback": send_slack_for_failure
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  default_args=default_args,
  description='This is a simple demo DAG for Hello World',
  schedule_interval=None,
  tags=['slack_test'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello',
    python_callable=say_hello,
    dag=dag
  )
  job2 = PythonOperator(
    task_id='say_hi',
    python_callable=say_hi,
    on_success_callback=send_slack_for_success,
    dag=dag
  )
  job1 >> job2

例2:SlackWebhookOperatorを使ってSlackへ通知

* SlackWebhookOperator については、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/06/141323

1)前提条件

* Slack の Incoming Webhooks の設定を行う
 => 以下の関連記事を参照のこと。

Slack ~ Incoming Webhooks ~
https://dk521123.hatenablog.com/entry/2021/10/15/091842

* Connection の設定(「http_conn_id='slack_webhook_demo'」を参照)
 => 以下の関連記事を参照のこと。

Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454

2)サンプル

import os

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator


def say_hello():
  print('hello world')
  # raise Exception("Test from say_hello")

def say_hi():
  print('Hi, World')
  # raise Exception("Test from say_hi")

def send_slack_for_failure(context):
  ''' Callback for failure
  '''
  print("fire send_slack_for_failure")
  # try-catch しとかないとエラーの原因が分からないかも、、、
  try:
    slack_failure = SlackWebhookOperator(
      task_id='slack_task_for_failure',
      http_conn_id='slack_webhook_demo',
      icon_emoji=':cat:',
      message='*Hello, world!* \nThis is a message for failure'
    )
    return slack_failure.execute(context=context)
  except Exception as ex:
    print("[Error] " + str(ex))
    raise ex

def send_slack_for_success(context):
  ''' Callback for success
  '''
  print("fire send_slack_for_success")
  # try-catch しとかないとエラーの原因が分からないかも、、、
  try:
    slack_success = SlackWebhookOperator(
      task_id='slack_task_for_success',
      http_conn_id='slack_webhook_demo',
      icon_emoji=':dog:',
      message='*Hello, world!* \nThis is a message for success'
    )
    return slack_success.execute(context=context)
  except Exception as ex:
    print("[Error] " + str(ex))
    raise ex

default_args = {
  "start_date": days_ago(2),
  "provide_context": True,
  "on_failure_callback": send_slack_for_failure
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  default_args=default_args,
  description='This is a simple demo DAG for Hello World',
  schedule_interval=None,
  tags=['slack_test'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello',
    python_callable=say_hello,
    dag=dag
  )
  job2 = PythonOperator(
    task_id='say_hi',
    python_callable=say_hi,
    on_success_callback=send_slack_for_success,
    dag=dag
  )
  job1 >> job2

例3:AWS SNS で通知する

* SnsPublishOperator については、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/06/141323

1)前提条件

* SNSの設定をしておく
 => 詳細は以下の関連記事を参照のこと。

Amazon SNS ~ 基本編 / Email ~
https://dk521123.hatenablog.com/entry/2021/10/14/092313

2)サンプル

import os

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.contrib.operators.sns_publish_operator import SnsPublishOperator


def say_hello():
  print('hello world')
  # raise Exception("Test from say_hello")

def say_hi():
  print('Hi, World')
  # raise Exception("Test from say_hi")

def send_email_for_failure(context):
  ''' Callback for failure
  '''
  print("Fire send_email_for_failure")
  try:
    email_failure = SnsPublishOperator(
      task_id='publish_sns_failure_task',
      target_arn='arn:aws:sns:us-west-2-xxxxxxx',
      subject='This is a title for failure',
      message='Hello World for failure')
    return email_failure.execute(context=context)
  except Exception as ex:
    print("[Error] " + str(ex))
    raise ex

def send_email_for_success(context):
  ''' Callback for success
  '''
  print("Fire send_email_for_success")
  try:
    email_success = SnsPublishOperator(
      task_id='publish_sns_success_task',
      target_arn='arn:aws:sns:us-west-2-xxxxxxx',
      subject='This is a title for success',
      message='Hello World for success'
    )
    return email_success.execute(context=context)
  except Exception as ex:
    print("[Error] " + str(ex))
    raise ex

default_args = {
  "start_date": days_ago(2),
  "provide_context": True,
  "on_failure_callback": send_email_for_failure
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  default_args=default_args,
  description='This is a simple demo DAG for Hello World',
  schedule_interval=None,
  tags=['sns_test'],
) as dag:
  job1 = PythonOperator(
    task_id='say_hello',
    python_callable=say_hello,
    dag=dag
  )
  job2 = PythonOperator(
    task_id='say_hi',
    python_callable=say_hi,
    on_success_callback=send_email_for_success,
    dag=dag
  )
  job1 >> job2

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703
Amazon SES ~ Emailサービス ~
https://dk521123.hatenablog.com/entry/2017/04/28/234103
Amazon SNS ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/06/03/175213
Amazon SNS ~ 基本編 / Email ~
https://dk521123.hatenablog.com/entry/2021/10/14/092313
Slack ~ Access Token ~
https://dk521123.hatenablog.com/entry/2021/10/13/092235
Slack ~ Incoming Webhooks ~
https://dk521123.hatenablog.com/entry/2021/10/15/091842

【AWS】【トラブル】MWAA に関するトラブルシュート

■ はじめに

https://dk521123.hatenablog.com/entry/2021/09/29/131101

で、MWAA(Amazon Managed Workflow for Apache Airflow)を
使っていて、出くわした問題について、徐々にでは、
その解決方法などのトラブルシュートをメモっておく。

今回以外のトラブルシュートについては、以下の関連記事を参照。

MWAA で Secrets Manager の設定追加後にAirflow CLIからエラー
https://dk521123.hatenablog.com/entry/2024/01/18/122436

目次

【1】ログ確認時に「Could not read remote logs from log_group」が表示

【1】ログ確認時に「Could not read remote logs from log_group」が表示

NWAA の Web UI 上で、実行ログを確認できるのだが
その実行ログが、以下「エラー内容」のように表示され
実質、ログ出力が確認できない

エラー内容

*** Reading remote log from Cloudwatch log_group: airflow-xxxxx-Task log_stream: xxxxxx/xxxxxx/yyyy-MM-ddTxxx/1.log.
Could not read remote logs from log_group: airflow-xxxxx-Task log_stream: xxxxxx/xxxxxx/yyyy-MM-ddTxxx/1.log.

原因
https://docs.aws.amazon.com/mwaa/latest/userguide/t-cloudwatch-cloudtrail-logs.html#t-task-fail-permission

に記載されている通りで、権限問題。

解決案

* 例えば、Airflowの実行ロールに
 ClouldWatchのFullAccess権限を付加する

関連記事

MWAA ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/09/29/131101
MWAA で Secrets Manager の設定追加後にAirflow CLIからエラー
https://dk521123.hatenablog.com/entry/2024/01/18/122436