◾️はじめに
Apache Airflow での Amazon Athenaを扱うことを考える
目次
【1】Apache Airflow での Amazon Athena 【2】インストール 【3】Operator 1)AthenaOperator 【4】Sensor 1)AthenaSensor 【5】サンプル
【1】Apache Airflow での Amazon Athena
* 専用のOperator・Sensorが用意されている
【2】インストール
pip install 'apache-airflow[amazon]'
【3】Operator
1)AthenaOperator
read_table = AthenaOperator(
task_id="read_table",
query=query_read_table,
database=athena_database,
output_location=f"s3://{s3_bucket}/",
)
【4】Sensor
* Sensor の詳細については、以下の関連記事を参照のこと
Apache Airflow ~ Sensor ~
https://dk521123.hatenablog.com/entry/2023/10/30/002751
1)AthenaSensor
await_query = AthenaSensor(
task_id="await_query",
query_execution_id=read_table.output,
)
【5】サンプル
from datetime import timedelta from airflow import DAG from airflow.utils.dates import days_ago s3_bucket = "your-s3-bucket" athena_table = "demo_table" athena_database = "demo_db" query_read_table = f"SELECT * from {athena_database}.{athena_table}" default_args = { 'owner': 'your-name', 'depends_on_past': False, 'email': ['your-email@gmail.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG( 'hello_world', default_args=default_args, description='This is a simple demo DAG for Hello World', schedule_interval=timedelta(days=1), start_date=days_ago(2), tags=['hello_world'], ) as dag: read_table = AthenaOperator( task_id="read_table", query=query_read_table, database=athena_database, output_location=f"s3://{s3_bucket}/", ) read_table.sleep_time = 1 await_query = AthenaSensor( task_id="await_query", query_execution_id=read_table.output, ) read_table >> await_query
関連記事
Apache Airflow ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/09/28/135510
Apache Airflow ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2021/07/24/233012
Apache Airflow ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2021/07/28/234319
Apache Airflow ~ Sensor ~
https://dk521123.hatenablog.com/entry/2023/10/30/002751