■ はじめに
Airflow で、Task 間で値を受け渡す際に必要な XComs について 多く触れることになりそうなので、メモっておく
目次
【1】XComs 【2】関連するAPI 1)xcom_push 2)xcom_pull 【3】サンプル 例1:Airflow v1 例2:Airflow v2
【1】XComs
* XComs = cross-communication (交差通信?) * 異なるTask間でデータをやり取りするための手段
https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
【2】関連するAPI
1)xcom_push
* データをセットする
# pushes data in any_serializable_value into xcom with key "identifier as string" task_instance.xcom_push(key="identifier as a string", value=any_serializable_value)
2)xcom_pull
* データを取得する
# pulls the xcom variable with key "identifier as string" that was pushed from within task-1 task_instance.xcom_pull(key="identifier as string", task_ids="task-1")
【3】サンプル
例1:Airflow v1
import os from datetime import timedelta from textwrap import dedent from airflow import DAG from airflow.utils.dates import days_ago from airflow.operators.python import PythonOperator def push(**context): context["task_instance"].xcom_push( key="hello", value="Hello World!!") def pull(**context): # ★ (私事だが)ハマったところ ★ # プロパティ「task_ids」を「task_id」にして例外でハマった result = context["task_instance"].xcom_pull( task_ids="push_task", key="hello") print(f"result = {result}") defalut_args = { "start_date": days_ago(2), "provide_context": True } with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), description='This is a simple demo.', default_args=defalut_args, schedule_interval=None, tags=['hello_world'], ) as dag: job1 = PythonOperator( task_id='push_task', dag=dag, python_callable=push, ) job2 = PythonOperator( task_id='pull_task', dag=dag, python_callable=pull, ) job1 >> job2
例2:Airflow v2
* 以下の関連記事のサンプルをベースにサンプル作成
https://dk521123.hatenablog.com/entry/2023/10/21/233404
import json import airflow from airflow.decorators import dag, task from airflow.utils.dates import days_ago default_args = { "owner": "airflow" } @dag( default_args=default_args, schedule_interval="@daily", start_date=days_ago(2), tags=['example'] ) def main_dag(): @task def get_hello_world(**kwargs): # ★セット★ task_instance = kwargs["task_instance"] # task_instance = kwargs["ti"] data_string = '{"name": "Mike", "age": 23, "created_at": "2023-10-23"}' task_instance.xcom_push("demo_dict", data_string) return 'hello world' @task def say_hello(result, **kwargs): print(result) # ★取得★ task_instance = kwargs["task_instance"] # task_instance = kwargs["ti"] # (ti = task_instance) extract_data_string = task_instance.xcom_pull(task_ids="get_hello_world", key="demo_dict") order_data = json.loads(extract_data_string) for value in order_data.values(): print(value) result = get_hello_world() say_hello(result) main_dag = main_dag()
参考文献
https://dev.classmethod.jp/articles/read-provide-context-and-task-instance/
https://www.flywheel.jp/topics/airflow-dependency-between-dags/
https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html
関連記事
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
MWAA Local ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/10/21/233404
Python ~ 可変長引数 / *args kwargs ~ **
https://dk521123.hatenablog.com/entry/2023/11/01/000915