【AWS】Lambda ~ Python / 入門編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/10/05/105550

のコードを AWS Lambda (ラムダ)で実行する必要ができたのだが
よくよく考えると Lambda は、Javaでしか組んだことがなく、
Python は初めてだったので、メモっておく。

後、大分、前にやったから、すっかり忘れているので、
リハビリも兼ねて、デプロイ手順とかもメモっておく。

目次

【1】AWS Lambda
【2】テンプレート
【3】プログラム実行までの流れ
 1)Lambda関数をAWS上に登録する
 2)Test Event登録およびTest 実行
 3)処理したいプログラムを実装する
【4】その他技術事項
 1)環境変数の設定・取得
 2)ログ出力

【1】AWS Lambda

https://dk521123.hatenablog.com/entry/2017/04/05/235618

より抜粋
~~~~~~~~
* サーバレスのプログラム実行環境
~~~~~~~~

【2】テンプレート

* Python の場合、以下のようになる。

Pythonコード

import json

def lambda_handler(event, context):
  # ★ここに自分の実装を書く
  return {
    'statusCode': 200,
    'body': json.dump("Hello from Lambda!")
  }

【3】プログラム実行までの流れ

1)Lambda関数をAWS上に登録する

[1] AWSマネージメントコンソールにログインし、
 「Lambda」のページにアクセスする
[2] 画面右上の「Create function」ボタン押下
[3] 以下を入力し、画面右下の「Create function」ボタン押下
~~~~~~~
 * 「Author from scratch」を選択(デフォルトで選択されている)

Base information
 * Function name : 任意(今回は「hello_world」)
 * Runtime : Python 3.9 (他にも「3.6」~「3.8」が選択可能)
 * Architecture : x86_64(デフォルトで選択されている)
~~~~~~~

※ 後は、既存のロールやVPCなどがあれば選択する

2)Test Event登録およびTest 実行

「1)-[3]」の流れのまま...

[1] 「Test」ボタン押下
[2] Test Eventの設定ページにおいて、以下を入力し、「Test」ボタン押下
 * Event template: hello_world (デフォルトで選択されている)
 * Event name : 任意(ここでは「hello_world」)

3)処理したいプログラムを実装する

* テンプレートの「# ★ここに自分の実装を書く」部分に自分の実装を書く

使用上の注意

* 修正後にTestする際は、一度、「Deploy」ボタン押下後ではないと
 修正が反映されないので注意
 => 結構、ハマった。ログ埋め込んでもでてこないので、、、

【4】その他技術事項

1)環境変数の設定・取得

* 以下の公式サイトに記載されている。

https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/configuration-envvars.html

a) 環境変数を取得する(例:現在のRegion nameの取得)

import os

region = os.environ.get('AWS_REGION')

b) 独自の環境変数を設定する

[1] AWS マネージメントコンソールにおいて、対象のLambdaを選択する
[2] [Configuration]-[Enviroment variables]-[Edit]を選択
[3] 「Key」「Value」に環境変数名とその値を入力し、「Save」ボタン押下

 => 使い方は「a) 環境変数を取得する」のようにすればいい。

2)ログ出力

https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/python-logging.html

に詳しく記載されている。

 上述の「使用上の注意」で述べているように
「Deploy」ボタン押下しないと修正が反映されないので注意。
 => ログ埋め込んでもログが出力されなくてハマって大変だった、、、

サンプル

import os
import logging

# print()文でもできる(簡単なツールならprint()でもいいかも)

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
  logger.info('## ENVIRONMENT VARIABLES')
  logger.info(os.environ)

確認方法

[1] AWS マネージメントコンソールにおいて、対象のLambdaを選択する
[2] [Monitor]-[Logs](又は「View logs in CloudWatch」)を選択

参考文献

https://dev.classmethod.jp/articles/lambda-my-first-step/

関連記事

Lambda ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2017/04/05/235618
Lambda ~ Lambda で気を付ける事項 ~
https://dk521123.hatenablog.com/entry/2018/02/04/233700
Lambda ~ Lambda でハマったこと ~
https://dk521123.hatenablog.com/entry/2017/12/16/231714
Serverless Framework ~ 環境設定編 ~
https://dk521123.hatenablog.com/entry/2023/11/02/000200
Serverless Framework ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/11/03/234825
機密データの管理 ~ Secrets Manager / boto3 編 ~
https://dk521123.hatenablog.com/entry/2021/10/05/105550

【Airflow】Apache Airflow ~ 通知あれこれ編 ~

■ はじめに

Apache Airflow で処理の失敗時やワークフロー成功時に
以下のサービスへ通知する方法を調べたので、メモっておく。
~~~~~~~~~~
【1】Slack
【2】Email / AWS SES
【3】AWS SNS
【4】Microsoft Teams
~~~~~~~~~~

目次

【0】全般的な注意点
 1)メッセージ内容が日本語を使用する場合
【1】Slack
 0)注意事項
 1)SlackAPIOperator
 2)SlackAPIPostOperator
 3)SlackAPIFileOperator
 4)SlackWebhookOperator
【2】Email / AWS SES
 0)注意事項
 1)EmailOperator
 2)SESHook
【3】AWS SNS
 0)注意事項
 1)SnsPublishOperator
 2)AwsSnsHook
【4】Microsoft Teams
【5】通知に関する技術事項
 1)実装せずにタスク失敗をEmail通知する
 2)タスク成功・失敗時に通知する
 3)接続情報の保持

なお、サンプルについては、以下の関連記事を参照のこと。

Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000

【0】全般的な注意点

1)メッセージ内容が日本語を使用する場合

* DAGのPythonコードを、
 ファイル形式「UTF-8」で保存すること

 => 日本語のメッセージでテストした際にWeb UI側で
  エラー「SyntaxError: ... 'utf-8' codec can't decode ...」が
  表示されたので、何かなーって原因を探ったら、ファイル形式だった
 => 詳細は、以下の関連記事を参照のこと。

Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000

【1】Slack

* 関連するAPI仕様の詳細は、以下のサイトを参照。

https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/_api/airflow/providers/slack/operators/slack/index.html
https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/slack_webhook_operator/index.html

0)注意事項

* Slack の Access Token を用意する必要がある
 => トークンに関しては、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/13/092235

* WebHook の場合は、 Incoming Webhookを有効にする

https://dk521123.hatenablog.com/entry/2021/10/15/091842

1)SlackAPIOperator

* 以下の1)~2)の親クラス

2)SlackAPIPostOperator

* Slack チェンネルへのPost送信を実行

構文

from airflow.operators.slack_operator import SlackAPIPostOperator

slack = SlackAPIPostOperator(
    task_id="post_hello",
    dag=dag,
    token="XXX",
    text="hello there!",
    channel="#random",
)

3)SlackAPIFileOperator

* Slack チャンネルへのファイル送信を実行

構文

# Send file with filename and filetype
slack = SlackAPIPostOperator(
    task_id="post_hello",
    dag=dag,
    token="XXX",
    text="hello there!",
    channel="#random",
)

4)SlackWebhookOperator

* 関連するAPI仕様の詳細は、以下のサイトを参照。

https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/slack_webhook_operator/index.html

* Webhook については、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/15/091842

* Connection を使用する(「http_conn_id='slack_webhook'」部分)場合は、
 以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/10/16/000454

構文

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

slack_job = SlackWebhookOperator(
  task_id='slack_task',
  # Connectionで指定する(webhook_tokenでトークンで指定することも可能)
  http_conn_id='slack_webhook',
  message='*Hello, world!* \nThis is a message)',
  username='slack-user-name',
  icon_emoji=':dog:',
  channel='#your-channel-name',
  dag=dag
)

コード
http://airflow.apache.org/docs/apache-airflow/1.10.13/_modules/airflow/contrib/operators/slack_webhook_operator.html

参考文献
https://qiita.com/munaita_/items/ad0f6f9590185741fff6

【2】Email / AWS SES

* 関連するAPI仕様の詳細は、以下のサイトを参照。

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/email/index.html
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/hooks/ses/index.html

* なお、Amazon SES に関する詳細は、以下の関連記事を参照のこと

Amazon SES ~ Emailサービス ~
https://dk521123.hatenablog.com/entry/2017/04/28/234103

0)注意事項

* AirflowにSMTPの接続情報を設定する必要がある

設定方法

[1] airflow.cfgで設定する方法
 => 詳細は、以下のサイトを参照。

https://airflow.apache.org/docs/apache-airflow/stable/howto/email-config.html

[2] 環境変数で指定する方法
 => 詳細は、以下のサイトを参照。

https://dev.classmethod.jp/articles/apache-airflow-send-mail-using-amazon-ses/

* 設定値については、以下の公式ドキュメントを参照。

設定方法 / AWS MWAA

* MWAA(Amazon Managed Workflow for Apache Airflow)での設定は
 以下のサイトを参照のこと(基本的にやることは、前述と変わらない)

https://qiita.com/pioho07/items/58c97bbfa8163f713d80

Email
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html?highlight=email#email
SMTP (Simple Mail Transfer Protocol)
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#smtp

1)EmailOperator

* Email送信する
* 添付ファイルも引数「files (list)」で送れそう

構文

from airflow.operators.email import EmailOperator

send_mail = EmailOperator(
  task_id="sendmail",
  to="to_address@xxxx.com",
  cc="cc_address@xxxx.com",
  bcc="bcc_address@xxxx.com",
  subject="This is a title",
  html_content="<b>Hello World</b>",
  mime_charset="utf-8",
)

2)SESHook

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/sns/index.html

* SES(Amazon Simple Email Service) のフック関数
 => 使うことあるか分からんが、、、

参考文献
https://dev.classmethod.jp/articles/apache-airflow-send-mail-using-amazon-ses/
https://qiita.com/pioho07/items/58c97bbfa8163f713d80
https://bhavaniravi.com/blog/sending-emails-from-airflow/

【3】AWS SNS

* 関連するAPI仕様の詳細は、以下のサイトを参照。

https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/sns_publish_operator/index.html
https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/hooks/aws_sns_hook/index.html

* 他のサービスと比較すると、あまり参考になるサイトがない印象。
 => 以下のサイトくらい?

https://www.mikulskibartosz.name/send-sms-from-airflow-using-aws-sns/

* なお、Amazon SNS に関する詳細は、以下の関連記事を参照のこと

Amazon SNS ~ 通知サービス ~
https://dk521123.hatenablog.com/entry/2021/06/03/175213

0)注意事項

* 以下を行う必要がある
 [1] AWS SNSで、トピックとサブスクリプションを追加

1)SnsPublishOperator

* Amazon SNSへメッセージをPublishする

https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/sns_publish_operator/index.html

構文

from airflow.contrib.operators.sns_publish_operator import SnsPublishOperator

publish_sns = SnsPublishOperator(
  task_id='publish_sns_task',
  aws_conn_id='your_sns_connection_id',
  target_arn='your_sns_topic_arn',
  subject='This is a title',
  message='Hello World'
)

コード

* 内部で「2)AwsSnsHook」を呼び出している

https://airflow.apache.org/docs/apache-airflow/1.10.12/_modules/airflow/contrib/operators/sns_publish_operator.html

補足:引数「aws_conn_id」「target_arn」

* aws_conn_id : aws connection to use
 => 使用するAWSコネクション??

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html

 => 以下のサイトが参考になりそう。

https://dev.classmethod.jp/articles/apache-airflow-aws-conn-assume-role/

* target_arn : either a TopicArn or an EndpointArn
 => トピックARNかエンドポイントARNのどちらか
 => トピックの ARN「arn:aws:sns:<RegionName>-xxxxxxx」
  を指定すればよさそう

2)AwsSnsHook

* SNS への フック関数

https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/hooks/aws_sns_hook/index.html

* コードを参照したところ、boto3 API - public(通知依頼) を使用している模様

コード
http://airflow.apache.org/docs/apache-airflow/1.10.6/_modules/airflow/contrib/hooks/aws_sns_hook.html
boto3 API - public(通知依頼)
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns.html#SNS.Client.publish

【4】Microsoft Teams

* HTTP Post送信でできそう。
 => 詳細は、以下の参考文献を参照。

参考文献
https://qiita.com/whata/items/dba1c2db1a38d9d4a4c0
https://zenn.dev/antyuntyun/articles/airflow_custom_notification

【5】通知に関する技術事項

1)実装せずにタスク失敗をEmail通知する

* Email送信であれば「'email_on_failure': True」って設定がある

https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/operators/index.html

より抜粋
~~~~~~~~~~~~
email_on_failure (bool)
 – Indicates whether email alerts should be sent when a task failed
=> タスク失敗時に、Emailアラートを送るかどうかを示す
~~~~~~~~~~~~

Pythonコード・一部抜粋

default_args = {
  'email': ['xxxxx@gmail.com'],
  'email_on_failure': True,
  'email_on_retry': True,
}

使用上の注意

* 「【2】Email / AWS SES」の「0)注意事項」で記載しているのと同様に
 「AirflowにSMTPの接続情報を設定する必要がある」。
 => 何も設定せずに行うと、ログに以下のエラーが表示されてしまう。

~~~~~~~~
ConnectionRefusedError: [Errno 111] Connection refused
~~~~~~~~

https://github.com/puckel/docker-airflow/issues/425

2)タスク成功・失敗時に通知する

* コールバックが用意されているので
 失敗時:on_failure_callback
 成功時:on_success_callback

* default_argsに設定すればDAG内の全タスクに適応可能
 => 失敗時は、default_argsに設定し、
  成功時は、最後のタスクに設定すればよさそう

実装イメージ

def send_for_failure(context):
  try:
    # 例外内容
    exception_message = str(context['exception'])
    # タスク インスタンス(※1参照)
    task_instance = context.get('task_instance')
    # Task ID
    task_id = task_instance.task_id
    # DAG ID
    task_id = task_instance.dag_id
    # Execution Date
    task_id = task_instance.execution_date

    # 以降は、送信する処理(省略)
  except Exception as ex:
    print("Error " + str(ex))
    raise ex

※1:タスク インスタンス

* タスク インスタンスの詳細は、以下の公式ドキュメントを参照。

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/taskinstance/index.html

参考文献
https://qiita.com/munaita_/items/1a5b131839e01ea7280d
https://blog.imind.jp/entry/2019/02/08/170332
https://medium.com/@momota/airflow-dag%E3%81%AB%E3%81%8A%E3%81%91%E3%82%8B%E5%90%84%E3%82%BF%E3%82%B9%E3%82%AF%E3%81%AE%E6%88%90%E5%8A%9For%E5%A4%B1%E6%95%97%E3%82%92slack%E3%81%AB%E9%80%9A%E7%9F%A5%E3%81%99%E3%82%8B-5eecda9ec378

3)接続情報の保持

* Tokenなどの接続情報の保持は、「Variable」を使用していると良さそう
 => Variableの設定方法など詳細は、以下の関連記事を参照のこと。

https://dk521123.hatenablog.com/entry/2021/09/30/163020

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow に関するトラブル
https://dk521123.hatenablog.com/entry/2021/10/03/000000
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703
Amazon SES ~ Emailサービス ~
https://dk521123.hatenablog.com/entry/2017/04/28/234103
Amazon SNS ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/06/03/175213
Amazon SNS ~ 基本編 / Email ~
https://dk521123.hatenablog.com/entry/2021/10/14/092313
Slack ~ Access Token ~
https://dk521123.hatenablog.com/entry/2021/10/13/092235
Slack ~ Incoming Webhooks ~
https://dk521123.hatenablog.com/entry/2021/10/15/091842

【AWS】Secrets Manager ~ boto3 編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2020/03/12/220717

の続き。

 Secrets Manager に含まれている情報を、
boto3 でアクセスして、ごにょごにょする必要ができたので
調べて、メモを残しておく。

目次

【1】API仕様
 1)get_secret_value()
【2】サンプル
 例1:RDS認証情報をboto3 APIで取得する
【3】トラブル
 1)boto3 API時にタイムアウトエラーが発生する

【1】API仕様

* 以下を参照のこと。

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/secretsmanager.html

1)get_secret_value()

* Secrets Managerで管理されている認証情報を取得するAPI
* 以下「Returns / Response Syntax」の「SecretString」に
 JSON形式で認証情報が入っている(ない場合は「SecretBinary」を使用)

Request Syntax

response = client.get_secret_value(
    SecretId='string',
    VersionId='string',
    VersionStage='string'
)

Returns / Response Syntax

{
    'ARN': 'string',
    'Name': 'string',
    'VersionId': 'string',
    'SecretBinary': b'bytes', # ★注目★
    'SecretString': 'string', # ★注目★
    'VersionStages': [
        'string',
    ],
    'CreatedDate': datetime(2015, 1, 1)
}

【2】サンプル

例1:RDS認証情報をboto3 APIで取得する

import ast
import json
import base64
import boto3
from botocore.exceptions import ClientError

# 定数
TARGET_SECREAT_ID = "sample-rds-key"
TARGET_REGION_NAME = "ap-northeast-1"

print(f"Start. {TARGET_SECREAT_ID}")

secrets_manager_client = boto3.client(
  'secretsmanager',
  region_name=TARGET_REGION_NAME)

try:
  response = secrets_manager_client.get_secret_value(
    SecretId=TARGET_SECREAT_ID
  )
  if 'SecretString' in response:
    secret = response['SecretString']
  else:
    secret = base64.b64decode(response['SecretBinary'])

  # astモジュールについては、下記の関連記事を参照
  # https://dk521123.hatenablog.com/entry/2021/10/01/000000
  secret_info = ast.literal_eval(secret)

  user_name = secret_info["username"]
  user_password = secret_info["password"]

  print(f"user_name={user_name}, user_password={user_password}")

  print("Done")
except ClientError as ex:
  print(f"Error : {ex}")
  raise ex

【3】トラブル

1)boto3 API時にタイムアウトエラーが発生する

boto3 API時に以下「エラー内容」のようなタイムアウトエラーが発生する

エラー内容

urllib3.exceptions.ConnectTimeoutError:
(<botocore.awsrequest.AWSHTTPSConnection object at 0xXXXXXXXXX>,
 'Connection to secretsmanager.us-west-1.amazonaws.com
 timed out. (connect timeout=60)')

解決案
https://yohei-a.hatenablog.jp/entry/20200107/1578365127

より抜粋
~~~~~~~~~~~~~~
1. Secrets Manager の VPCエンドポイントを作成する。
2. VPCエンドポイントの Security Groupで 
 Python Shell のジョブの Security Group からのアクセスを許可する。
~~~~~~~~~~~~~~

参考文献

https://dev.classmethod.jp/articles/secrets_manager_tips_get_api_key/

関連記事

機密データの管理 ~ Secrets Manager 編 ~
https://dk521123.hatenablog.com/entry/2020/03/12/220717
機密データの管理 ~ Secrets Manager / AWS CLI 編 ~
https://dk521123.hatenablog.com/entry/2022/06/14/110641
CodeBuild で パラメータストア / Secrets Manager を使う
https://dk521123.hatenablog.com/entry/2020/02/18/230358
AWS Glue ~ Boto3 / Glue connection編 ~
https://dk521123.hatenablog.com/entry/2020/01/29/224525
Python ~ 基本編 / astモジュール ~
https://dk521123.hatenablog.com/entry/2021/10/01/000000
Lambda ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2017/04/05/235618
Lambda ~ Python / 入門編 ~
https://dk521123.hatenablog.com/entry/2021/10/07/103317