【Airflow】Apache Airflow ~ 入門編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/07/18/004531

の続き。
今回は、Apache Airflow に関する Hello world的なことを行ってみる。

目次

【1】参考になるサイト
【2】サンプル
【3】実行コマンド
 1)コードの確認
 2)DAGの確認
 3)各タスクのテスト
 4)DAGを実行する
 5)スケジューラ起動

【1】参考になるサイト

* 以下の公式サイトのチュートリアルを参考に行う

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html

【2】サンプル

* 以下のコードを「~/airflow/dags」配下に保存する
* 「~/airflow/airflow.cfg」の「dags_folder」に保存先が設定されている
~~~~~~
[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /home/your-user-name/airflow/dags
~~~~~~
* 名前は「hello_world」とする

~/airflow/dags/hello_world_dag.py

from datetime import timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago


# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
  'owner': 'your-name',
  'depends_on_past': False,
  'email': ['your-email@gmail.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
  # 'queue': 'bash_queue',
  # 'pool': 'backfill',
  # 'priority_weight': 10,
  # 'end_date': datetime(2016, 1, 1),
  # 'wait_for_downstream': False,
  # 'dag': dag,
  # 'sla': timedelta(hours=2),
  # 'execution_timeout': timedelta(seconds=300),
  # 'on_failure_callback': some_function,
  # 'on_success_callback': some_other_function,
  # 'on_retry_callback': another_function,
  # 'sla_miss_callback': yet_another_function,
  # 'trigger_rule': 'all_success'
}

# DAG(Directed acyclic graph;有向非巡回グラフ)を作成する
# 名前は「hello_world」とする
with DAG(
  'hello_world',
  default_args=default_args,
  description='This is a simple demo DAG for Hello World',
  schedule_interval=timedelta(days=1),
  start_date=days_ago(2),
  tags=['hello_world'],
) as dag:
  # job1, job2 and job3 are examples of tasks
  #  created by instantiating operators
  job1 = BashOperator(
    task_id='printing_date',
    bash_command='date',
  )
  job2 = BashOperator(
    task_id='sleep_5',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
  )
  job1.doc_md = dedent(
    """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    """
  )
  dag.doc_md = __doc__  # providing that you have a docstring at the beggining of the DAG
  dag.doc_md = """
    This is a documentation placed anywhere
  """  # otherwise, type it like this
  templated_command = dedent(
    """
   {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
  )
  job3 = BashOperator(
    task_id='templated_command',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
  )
  job1 >> [job2, job3]

【3】実行コマンド

1)コードの確認

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#running-the-script

# 以下を実行し、エラーが出ないことを確認
python ~/airflow/dags/hello_world_dag.py

2)DAGの確認

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#command-line-metadata-validation

# print the list of active DAGs
airflow dags list

# prints the list of tasks in the "hello_world" DAG
airflow tasks list hello_world

# prints the hierarchy of tasks in the "hello_world" DAG
airflow tasks list hello_world --tree

3)各タスクのテスト

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#id2

airflow tasks test hello_world printing_date 2021-07-24

airflow tasks test hello_world sleep_5 2021-07-24

airflow tasks test hello_world templated_command 2021-07-24

4)DAGを実行する

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#backfill

airflow dags backfill hello_world \
    --start-date 2021-07-24 \
    --end-date 2021-07-26

5)スケジューラ起動

airflow scheduler

参考文献

https://qiita.com/chan-p/items/526bbed95fdc73142c59

■ 関連記事

Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ CLI
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ Variable / Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000