【Airflow】Apache Airflow ~ 基本編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/07/18/004531
https://dk521123.hatenablog.com/entry/2021/07/24/233012

の続き。

Apache Airflow の基本的な構成などをメモしておく。

目次

【1】Airflow の基本構成
 1)Web Server
 2)Scheduler
 3)Executer
【2】Operator(オペレータ)
 1)BashOperator
 2)PythonOperator
 3)EmailOperator
【3】DAG(Directed acyclic graph;有向非巡回グラフ)
 1)DAG の定義方法
 2)DAGの依存関係の指定
【4】サンプル

【1】Airflow の基本構成

以下の3つで構成されている。

1)Web Server
2)Scheduler
3)Executer

1)Web Server

* 管理画面表示部
* Python の Web アプリケーションフレームワーク Flask で実装

2)Scheduler

* Job実行のスケジュール管理部

3)Executer

* Job実行部

【2】Operator(オペレータ)

* ETL の Job 単位を定義するクラス
* この オペレーターを DAG で関連づけてETL処理を定義する

https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/index.html

1)BashOperator

* Bashシェルを実行する際のクラス

2)PythonOperator

* Pythonを実行する際のクラス

3)EmailOperator

* Email を送信するためのクラス
 => 詳細は、以下の関連記事を参照のこと。

Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323

4)Sensor

* 指定時間、ファイル等を取得できるまで待つためのクラス

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/index.html

【3】DAG(Directed acyclic graph;有向非巡回グラフ)

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html

1)DAG の定義方法

表記法 - 1

with DAG("my_dag_name") as dag:
  op = DummyOperator(task_id="task")

表記法 - 2

my_dag = DAG("my_dag_name")
op = DummyOperator(task_id="task", dag=my_dag)

表記法 - 3

@dag(start_date=days_ago(2))
def generate_dag():
  op = DummyOperator(task_id="task")

dag = generate_dag()

2)DAGの依存関係の指定

Airflow1.8以降

job1 >> job2

# 「job2 << job1」でも可能

Airflow1.8より前のVersion

job1.set_downstream(job2)

# 「job2.set_upstream(job1)」でも可能

【4】サンプル

from datetime import timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
import logging

from airflow.utils.trigger_rule import TriggerRule

# 今回呼び出すhello world関数
def say_hello():
  logging.info('hello world')

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
  'owner': 'your-name',
  'depends_on_past': False,
  'email': ['your-email@gmail.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
  # 'queue': 'bash_queue',
  # 'pool': 'backfill',
  # 'priority_weight': 10,
  # 'end_date': datetime(2016, 1, 1),
  # 'wait_for_downstream': False,
  # 'dag': dag,
  # 'sla': timedelta(hours=2),
  # 'execution_timeout': timedelta(seconds=300),
  # 'on_failure_callback': some_function,
  # 'on_success_callback': some_other_function,
  # 'on_retry_callback': another_function,
  # 'sla_miss_callback': yet_another_function,
  # 'trigger_rule': 'all_success'
}

with DAG(
  'hello_world',
  default_args=default_args,
  description='This is a simple demo DAG for Hello World',
  schedule_interval=timedelta(days=1),
  start_date=days_ago(2),
  tags=['hello_world'],
) as dag:
  # job1, job2 and job3 are examples of tasks
  #  created by instantiating operators
  job1 = BashOperator(
    task_id='printing_date',
    bash_command='date',
  )
  job2 = BashOperator(
    task_id='sleep_5',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
  )
  job1.doc_md = dedent(
    """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    """
  )
  dag.doc_md = __doc__  # providing that you have a docstring at the beggining of the DAG
  dag.doc_md = """
    This is a documentation placed anywhere
  """  # otherwise, type it like this
  templated_command = dedent(
    """
   {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
  )
  job3 = BashOperator(
    task_id='templated_command',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
  )
  # hello world用のPythonOperator
  job4 = PythonOperator(
    task_id='say_hello',
    python_callable=say_hello,
    trigger_rule=TriggerRule.ONE_SUCCESS,
    dag=dag
  )

  job1 >> job2 >> job4
  job1 >> job3 >> job4

参考文献

https://future-architect.github.io/articles/20200131/
https://analytics.livesense.co.jp/entry/2018/02/06/132842

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ CLI
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000