■ はじめに
https://dk521123.hatenablog.com/entry/2021/09/29/131101
で、AWS 上で、 Apache Airflow(NWAA:Amazon Managed Workflow for Apach Airflow)を 動かした際に、ちらほら抜けている部分が多かったので 基本的なTip集をメモしておく。
目次
【1】スケジューリング - schedule_interval 【2】DAGにパラメータを渡す - dag_run.conf 1)利用例 a) PythonOperatorの場合 【3】Airflow の設定 1)設定ファイル「airflow.cfg」 2)環境変数 【4】AWS のサービス連携 1)Glueとの連携 2)EMRとの連携
【1】スケジューリング - schedule_interval
https://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html#dag-runs
に preset が記載されている。
preset | meaning | cron | 設定例 |
---|---|---|---|
None | スケジューリングしない。「externally triggered」を使う | - | schedule_interval=None |
@once | スケジューリングは1回だけ。本当に1回のみ。 | - | schedule_interval='@once' |
@hourly | 時間の初めに1時間に1度実行する | 0 * * * * | schedule_interval='@hourly' |
【2】DAGにパラメータを渡す - dag_run.conf
* AWS Glue で言うと Default run properties のように ワークフローの実行ごとにパラメータを渡すことができる
1)利用例
[1] MWAA(Amazon Managed Workflow for Apache Airflow)の場合、 Runボタン押下後に「Configuration JSON (Optional)」欄に JSON形式で、設定したい値を入力し、「Run」する ~~~ {} => { "value": "Hello" } ~~~ [2] プログラム先で、値を取得する
a) PythonOperatorの場合
def demo_method(**context): # 三項演算子的なものを使って # Key「value」が設定されていたらその値(今回の場合「Hello」) # 特に設定されていなければ「World」 result = context["dag_run"].conf["value"] \ if "value" in context["dag_run"].conf.keys() \ else "World" print(f"result ={result}")
【3】Airflow の設定
1)設定ファイル「airflow.cfg」
* 場所:$AIRFLOW_HOME/airflow.cfg
参考文献
https://qiita.com/K_ichi/items/a3809f679a4578e7cb97
2)環境変数
* フォーマット「$AIRFLOW__{SECTION}__{KEY}」 => SECTIONとKEYの間は_が2つ
参考文献
https://adragoona.hatenablog.com/entry/2018/03/18/002124
【4】AWS のサービス連携
* 以下のサイトが役に立ちそう
https://amazon-mwaa-for-analytics.workshop.aws/en/workshop-2.0.2/m1-processing.html
* 基本的に、下記にも述べているがAPIが用意されているので、 それを使うといい => 最悪、足りない処理は、boto3 API で実装すればいい
1)Glueとの連携
(1)Glue Operators の利用
* 専用オペレータ AwsGlueJobOperator や AWSGlueCrawlerOperator が用意されている
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/glue/index.html
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/glue_crawler/index.html
(2)boto3 APIを使った実装
* 以下のサイトがドンピシャ => PythonOperatorを定義し、 start_job_run()で実行し、 その後無限ループでget_job_run()で成功したら抜ける実装 => (1)のAPIのソースをみると似たような処理をしていた
https://qiita.com/pioho07/items/97ebd1351916177d50f3
2)EMRとの連携
(1)EMR Operators / Sensor の利用
* EMR Operators / Sensorが用意されている
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr.html
* 公式サイトにサンプルがある
補足:AWS公式サイトのブログより
以下に「Apache Airflow、Genie、および Amazon EMR で ビッグデータワークフローのオーケストレーションを行う: Part 1~2」 ってブログがある。 => そこでは、どうも、「Genie(ジニー?)」から EMRをハンドリングしている模様
https://aws.amazon.com/jp/blogs/news/orchestrate-big-data-workflows-with-apache-airflow-genie-and-amazon-emr-part-1/
https://aws.amazon.com/jp/blogs/news/orchestrate-big-data-workflows-with-apache-airflow-genie-and-amazon-emr-part-2/
Genie
https://netflix.github.io/genie/
より抜粋 ~~~~~~~ Genie is a completely open source distributed job orchestration engine developed by Netflix. [意訳] Genie は、Netflix社によって開発された 完全なオープンソースの分散型ジョブオーケストレーションエンジンです。 ~~~~~~~
参考文献
https://ohbarye.hatenablog.jp/entry/2020/10/18/airflow-parameterized-dag-run
https://dev.classmethod.jp/articles/airflow-faq/
参考文献
https://future-architect.github.io/articles/20200131/
関連記事
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト ~
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703