■ はじめに
久しぶりに、AirflowのDAGを作成することになったのだが 大分時間が経ってしまったので、ほとんど覚えておらず。。。 なので、今回は、そんなダメな自分への備忘録として DAGを作成することに特化したものを作る。 完全なリハビリ(復習)みたいな回。
目次
【1】サンプル 例1:Airflow 1.0版 例2:Airflow 2.0版(デコレータ)
【1】サンプル
例1:Airflow 1.0版
import logging from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.utils.trigger_rule import TriggerRule def say_hello(): logging.info('hello world') default_args = { 'owner': 'your-name', 'depends_on_past': False, 'start_date': datetime(2023, 7, 1), 'catchup': False, 'schedule_interval': '@daily', 'retries': 1 } # DAG(Directed acyclic graph;有向非巡回グラフ)を作成する # 名前は「hello_world」とする(★ここは必ず一意になるようにする★) with DAG('hello_world', default_args=default_args, tags=['hello_world'],) as dag: job1 = BashOperator( task_id='printing_date', bash_command='date', ) job2 = PythonOperator( task_id='say_hello', python_callable=say_hello, trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag ) job1 >> job2
例2:Airflow 2.0版(デコレータ)
import airflow from airflow.decorators import dag, task from airflow.utils.dates import days_ago default_args = { "owner": "airflow" } @dag( default_args=default_args, schedule_interval="@daily", start_date=days_ago(2), tags=['example'] ) def main_dag(): @task def get_hello_world(): return 'hello world' @task def say_hello(result): print(result) result = get_hello_world() say_hello(result) main_dag = main_dag()
参考文献
https://note.com/dd_techblog/n/n6f00d8b51457
関連記事
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319