【Airflow】Apache Airflow ~ あれこれ編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/09/29/131101

で、AWS 上で、
Apache Airflow(NWAA:Amazon Managed Workflow for Apach Airflow)を
動かした際に、ちらほら抜けている部分が多かったので
基本的なTip集をメモしておく。

目次

【1】スケジューリング - schedule_interval
【2】Task 間で値を受け渡す - XComs
 1)サンプル
【3】DAGにパラメータを渡す - dag_run.conf
 1)利用例
  a) PythonOperatorの場合
【4】Airflow の設定
 1)設定ファイル「airflow.cfg」
 2)環境変数
【5】AWS のサービス連携
 1)Glueとの連携
 2)EMRとの連携

【1】スケジューリング - schedule_interval

https://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html#dag-runs

に preset が記載されている。
preset meaning cron 設定例
None スケジューリングしない。「externally triggered」を使う - schedule_interval=None
@once スケジューリングは1回だけ。本当に1回のみ。 - schedule_interval='@once'
@hourly 時間の初めに1時間に1度実行する 0 * * * * schedule_interval='@hourly'

【2】Task 間で値を受け渡す - XComs

https://dk521123.hatenablog.com/entry/2021/09/28/135510

より抜粋
~~~~~~~~~
XComs
* 異なるTask間でデータをやり取りするための手段
~~~~~~~~~

1)サンプル

import os
from datetime import timedelta
from textwrap import dedent
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator


def push(**context):
  context["task_instance"].xcom_push(
    key="hello", value="Hello World!!")

def pull(**context):
  # ★ (私事だが)ハマったところ ★
  # プロパティ「task_ids」を「task_id」にして例外でハマった 
  result = context["task_instance"].xcom_pull(
    task_ids="push_task", key="hello")
  print(f"result = {result}")

defalut_args = {
  "start_date": days_ago(2),
  "provide_context": True
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  description='This is a simple demo.',
  default_args=defalut_args,
  schedule_interval=None,
  tags=['hello_world'],
) as dag:
  job1 = PythonOperator(
    task_id='push_task',
    dag=dag,
    python_callable=push,
  )
  job2 = PythonOperator(
    task_id='pull_task',
    dag=dag,
    python_callable=pull,
  )
  job1 >> job2

参考文献
https://dev.classmethod.jp/articles/read-provide-context-and-task-instance/
https://www.flywheel.jp/topics/airflow-dependency-between-dags/

【3】DAGにパラメータを渡す - dag_run.conf

* AWS Glue で言うと Default run properties のように
 ワークフローの実行ごとにパラメータを渡すことができる

1)利用例

[1] MWAA(Amazon Managed Workflow for Apache Airflow)の場合、
 Runボタン押下後に「Configuration JSON (Optional)」欄に
 JSON形式で、設定したい値を入力し、「Run」する
~~~
{} => { "value": "Hello" }
~~~

[2] プログラム先で、値を取得する

a) PythonOperatorの場合

def demo_method(**context):
  # 三項演算子的なものを使って
  # Key「value」が設定されていたらその値(今回の場合「Hello」)
  # 特に設定されていなければ「World」
  result = context["dag_run"].conf["value"] \
    if "value" in context["dag_run"].conf.keys() \
    else "World"
  print(f"result ={result}")

【4】Airflow の設定

1)設定ファイル「airflow.cfg」

* 場所:$AIRFLOW_HOME/airflow.cfg

参考文献
https://qiita.com/K_ichi/items/a3809f679a4578e7cb97

2)環境変数

* フォーマット「$AIRFLOW__{SECTION}__{KEY}」
 =>  SECTIONとKEYの間は_が2つ

参考文献
https://adragoona.hatenablog.com/entry/2018/03/18/002124

【5】AWS のサービス連携

* 以下のサイトが役に立ちそう

https://amazon-mwaa-for-analytics.workshop.aws/en/workshop-2.0.2/m1-processing.html

* 基本的に、下記にも述べているがAPIが用意されているので、
 それを使うといい
 => 最悪、足りない処理は、boto3 API で実装すればいい

1)Glueとの連携

(1)Glue Operators の利用

* 専用オペレータ AwsGlueJobOperator や
 AWSGlueCrawlerOperator が用意されている

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/glue/index.html
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/glue_crawler/index.html

(2)boto3 APIを使った実装

* 以下のサイトがドンピシャ
 => PythonOperatorを定義し、 start_job_run()で実行し、
  その後無限ループでget_job_run()で成功したら抜ける実装
 => (1)のAPIのソースをみると似たような処理をしていた

https://qiita.com/pioho07/items/97ebd1351916177d50f3

2)EMRとの連携

(1)EMR Operators / Sensor の利用

* EMR Operators / Sensorが用意されている

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr.html

* 公式サイトにサンプルがある

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.html

補足:AWS公式サイトのブログより

以下に「Apache Airflow、Genie、および Amazon EMR で
ビッグデータワークフローのオーケストレーションを行う: Part 1~2」
ってブログがある。
 => そこでは、どうも、「Genie(ジニー?)」から
  EMRをハンドリングしている模様

https://aws.amazon.com/jp/blogs/news/orchestrate-big-data-workflows-with-apache-airflow-genie-and-amazon-emr-part-1/
https://aws.amazon.com/jp/blogs/news/orchestrate-big-data-workflows-with-apache-airflow-genie-and-amazon-emr-part-2/
Genie
https://netflix.github.io/genie/

より抜粋
~~~~~~~
Genie is a completely open source distributed job orchestration engine developed by Netflix. 

[意訳]
Genie は、Netflix社によって開発された
完全なオープンソースの分散型ジョブオーケストレーションエンジンです。
~~~~~~~

参考文献
https://ohbarye.hatenablog.jp/entry/2020/10/18/airflow-parameterized-dag-run
https://dev.classmethod.jp/articles/airflow-faq/

参考文献

https://future-architect.github.io/articles/20200131/

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703