■ はじめに
Airflow から EMR を立ち上げる方法についてメモる。
目次
【0】EMR種類 【1】EMR Serverless 【2】EMR
【0】EMR種類
* EMR には、以下の種類があり、使用するOperatorも変わる 1)EMR Serverless 2)EMR (on EC2) など # 他にも「Amazon EMR on Amazon EKS」なんかがある
【1】EMR Serverless
https://docs.aws.amazon.com/ja_jp/emr/latest/EMR-Serverless-UserGuide/using-airflow.html
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr/emr_serverless.html
1)サンプル
import os from datetime import datetime from airflow import DAG from airflow.providers.amazon.aws.operators.emr import ( EmrServerlessCreateApplicationOperator, EmrServerlessStartJobOperator, EmrServerlessDeleteApplicationOperator, ) # Replace these with your correct values JOB_ROLE_ARN = "arn:aws:iam::account-id:role/emr_serverless_default_role" S3_LOGS_BUCKET = "DOC-EXAMPLE-BUCKET" DEFAULT_MONITORING_CONFIG = { "monitoringConfiguration": { "s3MonitoringConfiguration": {"logUri": f"s3://{S3_LOGS_BUCKET}/logs/"} }, } DAG_ID = os.path.basename(__file__).replace(".py", "") with DAG( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2023, 7, 1), tags=["example"], catchup=False, ) as dag: create_app = EmrServerlessCreateApplicationOperator( task_id="create_spark_app", # Type: Hive/Spark の2種類のみ job_type="SPARK", release_label="emr-6.11.0", config={"name": "airflow-test"}, ) application_id = create_app.output app_job = EmrServerlessStartJobOperator( task_id="start_app_job", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py", } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) delete_app = EmrServerlessDeleteApplicationOperator( task_id="delete_app", application_id=application_id, trigger_rule="all_done", ) create_app >> app_job >> delete_app
【2】EMR
https://docs.aws.amazon.com/ja_jp/mwaa/latest/userguide/samples-emr.html
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr/emr.html
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/emr/index.html
一般サイト
https://takemikami.com/2021/05/21/AirflowEMRStep.html
1)サンプル
import os from airflow import DAG from airflow.providers.amazon.aws.operators.emr import ( EmrAddStepsOperator, EmrCreateJobFlowOperator, EmrTerminateJobFlowOperator, ) from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor SPARK_STEPS = [ { "Name": "calculate_pi", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"], }, } ] JOB_FLOW_OVERRIDES = { "Name": "PiCalc", "ReleaseLabel": "emr-6.11.0", # 使用できるApplicationの種類は、各Versionのリリースノートで確認できそう # https://docs.aws.amazon.com/ja_jp/emr/latest/ReleaseGuide/emr-6110-release.html#emr-6110-app-versions "Applications": [{"Name": "Spark"}], "Instances": { "InstanceGroups": [ { "Name": "Primary node", "Market": "ON_DEMAND", "InstanceRole": "MASTER", "InstanceType": "m5.xlarge", "InstanceCount": 1, }, ], "KeepJobFlowAliveWhenNoSteps": False, "TerminationProtected": False, }, "Steps": SPARK_STEPS, "JobFlowRole": "EMR_EC2_DefaultRole", "ServiceRole": "EMR_DefaultRole", } DAG_ID = os.path.basename(__file__).replace(".py", "") with DAG( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2023, 7, 1), tags=["example"], catchup=False, ) as dag: # Step1: To EMR cluster cluster_creator = EmrCreateJobFlowOperator( task_id='create_job_flow', job_flow_overrides=JOB_FLOW_OVERRIDES ) # Step2: To Add Steps step_adder = EmrAddStepsOperator( task_id='add_steps', job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}", aws_conn_id='aws_default', steps=SPARK_STEPS, ) # Step3: To wait Steps step_checker = EmrStepSensor( task_id='watch_step', job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}", aws_conn_id='aws_default', ) # Step4: To terminate EMR cluster cluster_remover = EmrTerminateJobFlowOperator( task_id='remove_cluster', job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}", aws_conn_id='aws_default', ) cluster_creator >> step_adder >> step_checker >> cluster_remover
2)使用しているOpertor
[1] EmrCreateJobFlowOperator
* EMRクラスタ作成
[2] EmrAddStepsOperator
* EMRのStepの追加
[3] EmrStepSensor
* EMRのStep実行を監視
[4] EmrTerminateJobFlowOperator
* EMRクラスタを終了させる(Terminateする)
関連記事
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ DAG作成 ~
https://dk521123.hatenablog.com/entry/2023/07/01/000000