【Airflow】Apache Airflow ~ DAG作成 ~

■ はじめに

久しぶりに、AirflowのDAGを作成することになったのだが
大分時間が経ってしまったので、ほとんど覚えておらず。。。

なので、今回は、そんなダメな自分への備忘録として
DAGを作成することに特化したものを作る。
完全なリハビリ(復習)みたいな回。

目次

【1】サンプル
 例1:Airflow 1.0版
 例2:Airflow 2.0版(デコレータ)

【1】サンプル

例1:Airflow 1.0版

import logging

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule


def say_hello():
  logging.info('hello world')

default_args = {
  'owner': 'your-name',
  'depends_on_past': False,
  'start_date': datetime(2023, 7, 1),
  'catchup': False,
  'schedule_interval': '@daily',
  'retries': 1
}

# DAG(Directed acyclic graph;有向非巡回グラフ)を作成する
# 名前は「hello_world」とする(★ここは必ず一意になるようにする★)
with DAG('hello_world', default_args=default_args, tags=['hello_world'],) as dag:
  job1 = BashOperator(
    task_id='printing_date',
    bash_command='date',
  )
  job2 = PythonOperator(
    task_id='say_hello',
    python_callable=say_hello,
    trigger_rule=TriggerRule.ONE_SUCCESS,
    dag=dag
  )

  job1 >> job2

例2:Airflow 2.0版(デコレータ)

import airflow
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

default_args = {
  "owner": "airflow"
}
@dag(
  default_args=default_args,
  schedule_interval="@daily",
  start_date=days_ago(2),
  tags=['example']
)
def main_dag():
  @task
  def get_hello_world():
     return 'hello world'

  @task
  def say_hello(result):
    print(result)

  result = get_hello_world()
  say_hello(result)

main_dag = main_dag()

参考文献

https://note.com/dd_techblog/n/n6f00d8b51457

関連記事

Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319