【Airflow】MWAA ~ S3 Sensor 編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2021/09/29/131101

の続き。

 今回は、以下のサイトで
使用されている S3 Sensor について調べてみた

https://amazon-mwaa-for-analytics.workshop.aws/en/workshop-2.0.2/m1-processing/s3.html

目次

【1】S3 Sensor
 1)S3KeySensor
 2)S3KeySizeSensor
 3)S3KeysUnchangedSensor
 4)S3PrefixSensor
【2】使用上の注意

【1】S3 Sensor

S3に関するSensorについては、以下の通り。

1)S3KeySensor
2)S3KeySizeSensor
3)S3KeysUnchangedSensor
4)S3PrefixSensor

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3.html

1)S3KeySensor

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/sensors/s3_key/index.html#airflow.providers.amazon.aws.sensors.s3_key.S3KeySensor
サンプル
https://github.com/apache/airflow/blob/8fa976e514a411a1ecefa011f57416cf1fe09702/tests/providers/amazon/aws/sensors/test_s3_key.py#L128

s3_sensor = S3KeySensor(
 task_id='s3_key_sensor_task',
 poke_interval=60 * 30, # (seconds); checking file every half an hour
 timeout=60 * 60 * 12, # timeout in 12 hours
 bucket_key="s3://your-bucket-name/data/raw",
 bucket_name=None,
 wildcard_match=False,
 dag=dag)

2)S3KeySizeSensor

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/sensors/s3_key/index.html#airflow.providers.amazon.aws.sensors.s3_key.S3KeySizeSensor




3)S3KeysUnchangedSensor

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/sensors/s3_keys_unchanged/index.html#airflow.providers.amazon.aws.sensors.s3_keys_unchanged.S3KeysUnchangedSensor

* ファイル数が変わったかどうかをチェック

4)S3PrefixSensor

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/sensors/s3_prefix/index.html#airflow.providers.amazon.aws.sensors.s3_prefix.S3PrefixSensor

* プレフィックスで指定したものが存在するまで待つ。
=> プレフィックスには、glob airfl* 又は SQL LIKE 'airfl%' を指定できる

サンプル
https://amazon-mwaa-for-analytics.workshop.aws/en/workshop-2.0.2/m1-processing/s3.html

from airflow.providers.amazon.aws.sensors.s3_prefix import S3PrefixSensor

S3_BUCKET_NAME = "your-s3-bucket-name"

s3_sensor = S3PrefixSensor(
  task_id='s3_sensor',
  bucket_name=S3_BUCKET_NAME,
  prefix='data/raw/green',
  dag=dag
)

【2】使用上の注意

* airflow.sensors.s3_key_sensor / airflow.sensors.s3_prefix_sensorは、
 Airflow v2では非推奨になっている

https://airflow.apache.org/docs/apache-airflow/2.0.2/_api/airflow/sensors/s3_key_sensor/index.html
https://airflow.apache.org/docs/apache-airflow/2.0.2/_api/airflow/sensors/s3_prefix_sensor/index.html

より抜粋
~~~~~~~~~
airflow.sensors.s3_key_sensor

This module is deprecated.
Please use airflow.providers.amazon.aws.sensors.s3_key.
~~~~~~~~~

関連記事

MWAA ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/09/29/131101
Apache Airflow ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2021/07/18/004531
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ あれこれ編 ~
https://dk521123.hatenablog.com/entry/2021/09/30/163020