【Airflow】Apache Airflow ~ EMR ~

■ はじめに

Airflow から EMR を立ち上げる方法についてメモる。

目次

【0】EMR種類
【1】EMR Serverless
【2】EMR

【0】EMR種類

* EMR には、以下の種類があり、使用するOperatorも変わる

1)EMR Serverless
2)EMR (on EC2)
など

# 他にも「Amazon EMR on Amazon EKS」なんかがある

【1】EMR Serverless

https://docs.aws.amazon.com/ja_jp/emr/latest/EMR-Serverless-UserGuide/using-airflow.html
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr/emr_serverless.html

1)サンプル

import os
from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrServerlessCreateApplicationOperator,
    EmrServerlessStartJobOperator,
    EmrServerlessDeleteApplicationOperator,
)

# Replace these with your correct values
JOB_ROLE_ARN = "arn:aws:iam::account-id:role/emr_serverless_default_role"
S3_LOGS_BUCKET = "DOC-EXAMPLE-BUCKET"

DEFAULT_MONITORING_CONFIG = {
    "monitoringConfiguration": {
        "s3MonitoringConfiguration": {"logUri": f"s3://{S3_LOGS_BUCKET}/logs/"}
    },
}

DAG_ID = os.path.basename(__file__).replace(".py", "")

with DAG(
    dag_id=DAG_ID,
    schedule_interval=None,
    start_date=datetime(2023, 7, 1),
    tags=["example"],
    catchup=False,
) as dag:
    create_app = EmrServerlessCreateApplicationOperator(
        task_id="create_spark_app",
        # Type: Hive/Spark の2種類のみ
        job_type="SPARK",
        release_label="emr-6.11.0",
        config={"name": "airflow-test"},
    )

    application_id = create_app.output

    app_job = EmrServerlessStartJobOperator(
        task_id="start_app_job",
        application_id=application_id,
        execution_role_arn=JOB_ROLE_ARN,
        job_driver={
            "sparkSubmit": {
                "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py",
            }
        },
        configuration_overrides=DEFAULT_MONITORING_CONFIG,
    )

    delete_app = EmrServerlessDeleteApplicationOperator(
        task_id="delete_app",
        application_id=application_id,
        trigger_rule="all_done",
    )

    create_app >> app_job >> delete_app

【2】EMR

https://docs.aws.amazon.com/ja_jp/mwaa/latest/userguide/samples-emr.html
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr/emr.html
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/emr/index.html

一般サイト
https://takemikami.com/2021/05/21/AirflowEMRStep.html

1)サンプル

import os

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrAddStepsOperator,
    EmrCreateJobFlowOperator,
    EmrTerminateJobFlowOperator,
)
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor


SPARK_STEPS = [
    {
        "Name": "calculate_pi",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
        },
    }
]

JOB_FLOW_OVERRIDES = {
    "Name": "PiCalc",
    "ReleaseLabel": "emr-6.11.0",
    # 使用できるApplicationの種類は、各Versionのリリースノートで確認できそう
    # https://docs.aws.amazon.com/ja_jp/emr/latest/ReleaseGuide/emr-6110-release.html#emr-6110-app-versions  
    "Applications": [{"Name": "Spark"}],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Primary node",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1,
            },
        ],
        "KeepJobFlowAliveWhenNoSteps": False,
        "TerminationProtected": False,
    },
    "Steps": SPARK_STEPS,
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
}

DAG_ID = os.path.basename(__file__).replace(".py", "")

with DAG(
    dag_id=DAG_ID,
    schedule_interval=None,
    start_date=datetime(2023, 7, 1),
    tags=["example"],
    catchup=False,
) as dag:
    # Step1: To EMR cluster
    cluster_creator = EmrCreateJobFlowOperator(
        task_id='create_job_flow', 
        job_flow_overrides=JOB_FLOW_OVERRIDES
    )
    # Step2: To Add Steps
    step_adder = EmrAddStepsOperator(
        task_id='add_steps',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        steps=SPARK_STEPS,
    )
    # Step3: To wait Steps
    step_checker = EmrStepSensor(
        task_id='watch_step',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
        aws_conn_id='aws_default',
    )
    # Step4: To terminate EMR cluster
    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='remove_cluster',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
    )

    cluster_creator >> step_adder >> step_checker >> cluster_remover

2)使用しているOpertor

[1] EmrCreateJobFlowOperator

* EMRクラスタ作成

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/emr/index.html#airflow.providers.amazon.aws.operators.emr.EmrCreateJobFlowOperator

[2] EmrAddStepsOperator

* EMRのStepの追加

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/emr/index.html#airflow.providers.amazon.aws.operators.emr.EmrAddStepsOperator

[3] EmrStepSensor

* EMRのStep実行を監視

https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/contrib/sensors/emr_step_sensor/index.html#module-contents

[4] EmrTerminateJobFlowOperator

* EMRクラスタを終了させる(Terminateする)

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/emr/index.html#airflow.providers.amazon.aws.operators.emr.EmrTerminateJobFlowOperator

関連記事

Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ DAG作成 ~
https://dk521123.hatenablog.com/entry/2023/07/01/000000