【Airflow】Apache Airflow ~ Amazon Athena ~

◾️はじめに

Apache Airflow での Amazon Athenaを扱うことを考える

目次

【1】Apache Airflow での Amazon Athena
【2】インストール
【3】Operator
 1)AthenaOperator
【4】Sensor
 1)AthenaSensor
【5】サンプル

【1】Apache Airflow での Amazon Athena

* 専用のOperator・Sensorが用意されている

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/athena/athena_boto.html#amazon-athena

【2】インストール

pip install 'apache-airflow[amazon]'

【3】Operator

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/athena/athena_boto.html#operators

1)AthenaOperator

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/athena/index.html

read_table = AthenaOperator(
    task_id="read_table",
    query=query_read_table,
    database=athena_database,
    output_location=f"s3://{s3_bucket}/",
)

【4】Sensor

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/athena/athena_boto.html#sensors

* Sensor の詳細については、以下の関連記事を参照のこと

Apache Airflow ~ Sensor ~
https://dk521123.hatenablog.com/entry/2023/10/30/002751

1)AthenaSensor

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/sensors/athena/index.html

await_query = AthenaSensor(
    task_id="await_query",
    query_execution_id=read_table.output,
)

【5】サンプル

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/tests/system/amazon/aws/example_athena.html

from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago


s3_bucket = "your-s3-bucket"
athena_table = "demo_table"
athena_database = "demo_db"

query_read_table = f"SELECT * from {athena_database}.{athena_table}"

default_args = {
  'owner': 'your-name',
  'depends_on_past': False,
  'email': ['your-email@gmail.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
}

with DAG(
  'hello_world',
  default_args=default_args,
  description='This is a simple demo DAG for Hello World',
  schedule_interval=timedelta(days=1),
  start_date=days_ago(2),
  tags=['hello_world'],
) as dag:

  read_table = AthenaOperator(
    task_id="read_table",
    query=query_read_table,
    database=athena_database,
    output_location=f"s3://{s3_bucket}/",
  )

  read_table.sleep_time = 1

  await_query = AthenaSensor(
    task_id="await_query",
    query_execution_id=read_table.output,
  )

  read_table >> await_query

関連記事

Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ Sensor ~
https://dk521123.hatenablog.com/entry/2023/10/30/002751