【Airflow】Apache Airflow ~ Sensor ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/10/04/230703

で、S3 Sensorを取り扱ったが
Apache Airflow の Sensor (センサ) 自体を扱うことになったので
メモる。

目次

【1】Sensor
【2】主なSensor 
 1)ExternalTaskSensor
 2)FileSensor
【3】センサのmodeプロパティ
 1)pokeモード
 2)rescheduleモード
 3)使い分け
【4】独自のSensor を作るには
 1)pokeメソッド
【5】サンプル
 例1:PythonSensor
 例2:sensorデコレータ

【1】Sensor

* Airflowのオペレータの一種
* 指定時間、ファイル等を取得できるまで
 待つ(ポーリング polling)ためのクラス
 => 何かが探知するためにも使える

【2】主なSensor

1)ExternalTaskSensor
2)FileSensor
3)PythonSensor

https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/index.html

1)ExternalTaskSensor

https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/external_task_sensor.html#externaltasksensor

2)FileSensor

https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/file.html

3)PythonSensor

https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#pythonsensor

【3】センサのmodeプロパティ

* 待ち方の指定
* 種類については、以下の通り。

1)poke(default)
2)reschedule

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html

1)pokeモード

* 実行時間中、ワーカースロットを占有する

cf. poke (ポーク) = つつく、かき立てる、ちょっかいをかける

2)rescheduleモード

* センサーはチェックするときだけワーカースロットを占有し、
 その間は、Airflowのスケジュールの仕組みで待つので、
 待っている間は、他のタスクに割り当てて実行することが可能

3)使い分け

* 以下は、公式ドキュメントを読んだ自分なりの理解。
 => pokeモードだとスロット専有してしまい、
  その間、他のタスクが実行できないので、
  基本、「rescheduleモード」で良さそう、、、

[1] pokeモードの場合

* タスクインスタンス内でsleepして待ち
 スロット(同時実行の制限)専有するので、
 1秒ごとにチェックするような周期が短いセンサに有効

[2] rescheduleモード

* 1分ごとにチェックするような周期が長くて済むセンサに有効

公式ドキュメント
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html

より抜粋
~~~~~~~~~~~~~~
Something that is checking every second should be in poke mode,
while something that is checking every minute should be in reschedule mode.

1秒ごとにチェックするようなものはpokeモードに、
1分ごとにチェックするようなものはrescheduleモードにすべきです。
~~~~~~~~~~~~~~

【4】独自のSensor を作るには

* BaseSensorOperatorクラスを継承
* 何かが起きたかチェックする pokeメソッド を実装する

1)pokeメソッド

* 何かが起きたかチェック

cf. poke (ポーク) = つつく、かき立てる、ちょっかいをかける

https://airflow.apache.org/docs/apache-airflow/2.5.1/_modules/airflow/sensors/base.html#BaseSensorOperator.poke

def poke(self, context: Context) -> bool | PokeReturnValue
"""Function defined by the sensors while deriving this class should override."""

PokeReturnValue

* poke メソッドの戻り値で、以下を持つ
~~~~
 + is_done: 終わったか(True/False)
 + xcom_value: DAG間の受け渡しのためのxcom
~~~~

https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/sensors/base.html#PokeReturnValue

class PokeReturnValue:
    """
    Optional return value for poke methods.

    Sensors can optionally return an instance of the PokeReturnValue class in the poke method.
    If an XCom value is supplied when the sensor is done, then the XCom value will be
    pushed through the operator return value.
    :param is_done: Set to true to indicate the sensor can stop poking.
    :param xcom_value: An optional XCOM value to be returned by the operator.
    """
    def __init__(self, is_done: bool, xcom_value: Any | None = None) -> None:

【5】サンプル

例1:PythonSensor

* 以下のPythonSensor  をコード例を見るとイメージ掴みやすいかも

https://airflow.apache.org/docs/apache-airflow/2.5.0/_modules/airflow/sensors/python.html#PythonSensor

例2:sensorデコレータ

* 以下の公式ドキュメントを参照

https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/example_dags/example_sensor_decorator.html

参考文献

https://qiita.com/notrogue/items/05f1d27c6c2bb3e5df56
https://docs.astronomer.io/learn/what-is-a-sensor

関連記事

Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020
MWAA ~ S3 Sensor 編 ~
https://dk521123.hatenablog.com/entry/2021/10/04/230703
MWAA Local ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/10/21/233404