【Airflow】Apache Airflow ~ XComs ~

■ はじめに

Airflow で、Task 間で値を受け渡す際に必要な XComs について
多く触れることになりそうなので、メモっておく

目次

【1】XComs
【2】関連するAPI
 1)xcom_push
 2)xcom_pull
【3】サンプル
 例1:Airflow v1
 例2:Airflow v2

【1】XComs

* XComs = cross-communication (交差通信?)
* 異なるTask間でデータをやり取りするための手段

https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html

【2】関連するAPI

1)xcom_push

* データをセットする

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/taskinstance/index.html#airflow.models.taskinstance.TaskInstance.xcom_push

# pushes data in any_serializable_value into xcom with key "identifier as string"
task_instance.xcom_push(key="identifier as a string", value=any_serializable_value)

2)xcom_pull

* データを取得する

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/taskinstance/index.html#airflow.models.taskinstance.TaskInstance.xcom_pull

# pulls the xcom variable with key "identifier as string" that was pushed from within task-1
task_instance.xcom_pull(key="identifier as string", task_ids="task-1")

【3】サンプル

例1:Airflow v1

import os
from datetime import timedelta
from textwrap import dedent
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator


def push(**context):
  context["task_instance"].xcom_push(
    key="hello", value="Hello World!!")

def pull(**context):
  # ★ (私事だが)ハマったところ ★
  # プロパティ「task_ids」を「task_id」にして例外でハマった 
  result = context["task_instance"].xcom_pull(
    task_ids="push_task", key="hello")
  print(f"result = {result}")

defalut_args = {
  "start_date": days_ago(2),
  "provide_context": True
}

with DAG(
  dag_id=os.path.basename(__file__).replace(".py", ""),
  description='This is a simple demo.',
  default_args=defalut_args,
  schedule_interval=None,
  tags=['hello_world'],
) as dag:
  job1 = PythonOperator(
    task_id='push_task',
    dag=dag,
    python_callable=push,
  )
  job2 = PythonOperator(
    task_id='pull_task',
    dag=dag,
    python_callable=pull,
  )
  job1 >> job2

例2:Airflow v2

* 以下の関連記事のサンプルをベースにサンプル作成

https://dk521123.hatenablog.com/entry/2023/10/21/233404

import json

import airflow
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

default_args = {
    "owner": "airflow"
}
@dag(
    default_args=default_args,
    schedule_interval="@daily",
    start_date=days_ago(2),
    tags=['example']
)
def main_dag():
    @task
    def get_hello_world(**kwargs):
        # ★セット★
        task_instance = kwargs["task_instance"]
        # task_instance = kwargs["ti"]
        data_string = '{"name": "Mike", "age": 23, "created_at": "2023-10-23"}'
        task_instance.xcom_push("demo_dict", data_string)
        return 'hello world'

    @task
    def say_hello(result, **kwargs):
        print(result)

        # ★取得★
        task_instance = kwargs["task_instance"]
        # task_instance = kwargs["ti"] # (ti = task_instance)
        extract_data_string = task_instance.xcom_pull(task_ids="get_hello_world", key="demo_dict")
        order_data = json.loads(extract_data_string)

        for value in order_data.values():
            print(value)

    result = get_hello_world()
    say_hello(result)

main_dag = main_dag()

参考文献

https://dev.classmethod.jp/articles/read-provide-context-and-task-instance/
https://www.flywheel.jp/topics/airflow-dependency-between-dags/
https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
MWAA Local ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/10/21/233404
Python ~ 可変長引数 / *args kwargs ~ **
https://dk521123.hatenablog.com/entry/2023/11/01/000915