■ はじめに
https://dk521123.hatenablog.com/entry/2021/07/18/004531
https://dk521123.hatenablog.com/entry/2021/07/24/233012
の続き。 Apache Airflow の基本的な構成などをメモしておく。
目次
【1】Airflow の基本構成 1)Web Server 2)Scheduler 3)Executer 【2】Operator(オペレータ) 1)BashOperator 2)PythonOperator 3)EmailOperator 【3】DAG(Directed acyclic graph;有向非巡回グラフ) 1)DAG の定義方法 2)DAGの依存関係の指定 【4】サンプル
【1】Airflow の基本構成
以下の3つで構成されている。 1)Web Server 2)Scheduler 3)Executer
1)Web Server
* 管理画面表示部 * Python の Web アプリケーションフレームワーク Flask で実装
2)Scheduler
* Job実行のスケジュール管理部
3)Executer
* Job実行部
【2】Operator(オペレータ)
* ETL の Job 単位を定義するクラス * この オペレーターを DAG で関連づけてETL処理を定義する
https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/index.html
1)BashOperator
* Bashシェルを実行する際のクラス
2)PythonOperator
* Pythonを実行する際のクラス
3)EmailOperator
* Email を送信するためのクラス => 詳細は、以下の関連記事を参照のこと。
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
4)Sensor
* 指定時間、ファイル等を取得できるまで待つためのクラス
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/index.html
【3】DAG(Directed acyclic graph;有向非巡回グラフ)
https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html
1)DAG の定義方法
表記法 - 1
with DAG("my_dag_name") as dag: op = DummyOperator(task_id="task")
表記法 - 2
my_dag = DAG("my_dag_name") op = DummyOperator(task_id="task", dag=my_dag)
表記法 - 3
@dag(start_date=days_ago(2)) def generate_dag(): op = DummyOperator(task_id="task") dag = generate_dag()
2)DAGの依存関係の指定
Airflow1.8以降
job1 >> job2
# 「job2 << job1」でも可能
Airflow1.8より前のVersion
job1.set_downstream(job2)
# 「job2.set_upstream(job1)」でも可能
【4】サンプル
from datetime import timedelta from textwrap import dedent # The DAG object; we'll need this to instantiate a DAG from airflow import DAG # Operators; we need this to operate! from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago from airflow.operators.python import PythonOperator import logging from airflow.utils.trigger_rule import TriggerRule # 今回呼び出すhello world関数 def say_hello(): logging.info('hello world') # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args = { 'owner': 'your-name', 'depends_on_past': False, 'email': ['your-email@gmail.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'sla_miss_callback': yet_another_function, # 'trigger_rule': 'all_success' } with DAG( 'hello_world', default_args=default_args, description='This is a simple demo DAG for Hello World', schedule_interval=timedelta(days=1), start_date=days_ago(2), tags=['hello_world'], ) as dag: # job1, job2 and job3 are examples of tasks # created by instantiating operators job1 = BashOperator( task_id='printing_date', bash_command='date', ) job2 = BashOperator( task_id='sleep_5', depends_on_past=False, bash_command='sleep 5', retries=3, ) job1.doc_md = dedent( """\ #### Task Documentation You can document your task using the attributes `doc_md` (markdown), `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets rendered in the UI's Task Instance Details page. ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png) """ ) dag.doc_md = __doc__ # providing that you have a docstring at the beggining of the DAG dag.doc_md = """ This is a documentation placed anywhere """ # otherwise, type it like this templated_command = dedent( """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """ ) job3 = BashOperator( task_id='templated_command', depends_on_past=False, bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, ) # hello world用のPythonOperator job4 = PythonOperator( task_id='say_hello', python_callable=say_hello, trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag ) job1 >> job2 >> job4 job1 >> job3 >> job4
参考文献
https://future-architect.github.io/articles/20200131/
https://analytics.livesense.co.jp/entry/2018/02/06/132842
関連記事
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 環境構築 / Kubernetes 編 ~
https://dk521123.hatenablog.com/entry/2023/05/13/000000
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ DAG作成 ~
https://dk521123.hatenablog.com/entry/2023/07/01/000000
Apache Airflow ~ DAGの引数 ~
https://dk521123.hatenablog.com/entry/2022/01/13/101634
Apache Airflow ~ Sensor ~
https://dk521123.hatenablog.com/entry/2023/10/30/002751
Apache Airflow ~ XComs ~
https://dk521123.hatenablog.com/entry/2023/10/31/000133
Apache Airflow ~ 実行タイミング ~
https://dk521123.hatenablog.com/entry/2022/01/15/014005
Apache Airflow ~ Task status ~
https://dk521123.hatenablog.com/entry/2024/12/04/093858
Apache Airflow ~ CLI ~
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ Backfill ~
https://dk521123.hatenablog.com/entry/2024/10/05/234219
Apache Airflow ~ Catchup ~
https://dk521123.hatenablog.com/entry/2024/10/06/205302
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト ~
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ Variable ~
https://dk521123.hatenablog.com/entry/2023/12/17/000000
Apache Airflow ~ Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow ~ 単体試験 / 環境設定編 ~
https://dk521123.hatenablog.com/entry/2024/06/16/002630
Apache Airflow ~ 単体試験 / 入門編 ~
https://dk521123.hatenablog.com/entry/2024/06/13/222403