■ はじめに
https://dk521123.hatenablog.com/entry/2021/07/18/004531
の続き。 今回は、Apache Airflow に関する Hello world的なことを行ってみる。
目次
【1】参考になるサイト 【2】サンプル 【3】実行コマンド 1)コードの確認 2)DAGの確認 3)各タスクのテスト 4)DAGを実行する 5)スケジューラ起動
【1】参考になるサイト
* 以下の公式サイトのチュートリアルを参考に行う
https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html
【2】サンプル
* 以下のコードを「~/airflow/dags」配下に保存する * 「~/airflow/airflow.cfg」の「dags_folder」に保存先が設定されている ~~~~~~ [core] # The folder where your airflow pipelines live, most likely a # subfolder in a code repository. This path must be absolute. dags_folder = /home/your-user-name/airflow/dags ~~~~~~ * 名前は「hello_world」とする
~/airflow/dags/hello_world_dag.py
from datetime import timedelta from textwrap import dedent # The DAG object; we'll need this to instantiate a DAG from airflow import DAG # Operators; we need this to operate! from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args = { 'owner': 'your-name', 'depends_on_past': False, 'email': ['your-email@gmail.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'sla_miss_callback': yet_another_function, # 'trigger_rule': 'all_success' } # DAG(Directed acyclic graph;有向非巡回グラフ)を作成する # 名前は「hello_world」とする with DAG( 'hello_world', default_args=default_args, description='This is a simple demo DAG for Hello World', schedule_interval=timedelta(days=1), start_date=days_ago(2), tags=['hello_world'], ) as dag: # job1, job2 and job3 are examples of tasks # created by instantiating operators job1 = BashOperator( task_id='printing_date', bash_command='date', ) job2 = BashOperator( task_id='sleep_5', depends_on_past=False, bash_command='sleep 5', retries=3, ) job1.doc_md = dedent( """\ #### Task Documentation You can document your task using the attributes `doc_md` (markdown), `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets rendered in the UI's Task Instance Details page. ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png) """ ) dag.doc_md = __doc__ # providing that you have a docstring at the beggining of the DAG dag.doc_md = """ This is a documentation placed anywhere """ # otherwise, type it like this templated_command = dedent( """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """ ) job3 = BashOperator( task_id='templated_command', depends_on_past=False, bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, ) job1 >> [job2, job3]
【3】実行コマンド
1)コードの確認
https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#running-the-script
# 以下を実行し、エラーが出ないことを確認
python ~/airflow/dags/hello_world_dag.py
2)DAGの確認
https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#command-line-metadata-validation
# print the list of active DAGs airflow dags list # prints the list of tasks in the "hello_world" DAG airflow tasks list hello_world # prints the hierarchy of tasks in the "hello_world" DAG airflow tasks list hello_world --tree
3)各タスクのテスト
https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#id2
airflow tasks test hello_world printing_date 2021-07-24 airflow tasks test hello_world sleep_5 2021-07-24 airflow tasks test hello_world templated_command 2021-07-24
4)DAGを実行する
https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#backfill
airflow dags backfill hello_world \ --start-date 2021-07-24 \ --end-date 2021-07-26
5)スケジューラ起動
airflow scheduler
参考文献
https://qiita.com/chan-p/items/526bbed95fdc73142c59
■ 関連記事
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築 / Docker 編 ~
https://dk521123.hatenablog.com/entry/2021/10/11/134840
Apache Airflow ~ 環境構築 / Kubernetes 編 ~
https://dk521123.hatenablog.com/entry/2023/05/13/000000
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ DAG作成 ~
https://dk521123.hatenablog.com/entry/2023/07/01/000000
Apache Airflow ~ DAGの引数 ~
https://dk521123.hatenablog.com/entry/2022/01/13/101634
Apache Airflow ~ Sensor ~
https://dk521123.hatenablog.com/entry/2023/10/30/002751
Apache Airflow ~ XComs ~
https://dk521123.hatenablog.com/entry/2023/10/31/000133
Apache Airflow ~ 実行タイミング ~
https://dk521123.hatenablog.com/entry/2022/01/15/014005
Apache Airflow ~ Task status ~
https://dk521123.hatenablog.com/entry/2024/12/04/093858
Apache Airflow ~ CLI ~
https://dk521123.hatenablog.com/entry/2021/10/21/130702
Apache Airflow ~ Backfill ~
https://dk521123.hatenablog.com/entry/2024/10/05/234219
Apache Airflow ~ Catchup ~
https://dk521123.hatenablog.com/entry/2024/10/06/205302
Apache Airflow ~ リトライ ~
https://dk521123.hatenablog.com/entry/2021/10/10/000000
Apache Airflow ~ タイムアウト ~
https://dk521123.hatenablog.com/entry/2021/10/12/000000
Apache Airflow ~ 同時実行 / 並列関連 ~
https://dk521123.hatenablog.com/entry/2021/10/19/144148
Apache Airflow ~ Variable ~
https://dk521123.hatenablog.com/entry/2023/12/17/000000
Apache Airflow ~ Connection ~
https://dk521123.hatenablog.com/entry/2021/10/16/000454
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
Apache Airflow ~ 通知あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/10/06/141323
Apache Airflow ~ 通知サンプル編 ~
https://dk521123.hatenablog.com/entry/2021/10/09/000000
Apache Airflow ~ 単体試験 / 環境設定編 ~
https://dk521123.hatenablog.com/entry/2024/06/16/002630
Apache Airflow ~ 単体試験 / 入門編 ~
https://dk521123.hatenablog.com/entry/2024/06/13/222403
MWAA ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/09/29/131101
MWAA ~ Variable ~
https://dk521123.hatenablog.com/entry/2023/12/28/002530
MWAA Local ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/11/05/233309
MWAA Local ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/10/21/233404
MWAA Local ~ AWSに繋げるようにするには ~
https://dk521123.hatenablog.com/entry/2023/10/28/184431