【AWS】Athena ~ boto3 ~

◾️はじめに

https://dk521123.hatenablog.com/entry/2020/06/17/173717
https://dk521123.hatenablog.com/entry/2023/10/13/144254

の続き。

今回は、AWS Athena の boto3 API についてメモしておく

【1】boto3

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html

import boto3

client = boto3.client('Athena')

1)start_query_execution

* SQL実行

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena/client/start_query_execution.html

response = client.start_query_execution(
    QueryString='SELECT * FROM demo_schema.demo_table;',
    WorkGroup='demo_wg',
)

2)get_query_execution

* クエリ実行状況を確認

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena/client/get_query_execution.html

response = client.get_query_execution(
    QueryExecutionId='string'
)

3)get_query_results

* クエリ実行結果を取

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena/client/get_query_results.html

response = client.get_query_results(
    QueryExecutionId='string',
    NextToken='string',
    MaxResults=123,
    QueryResultType='DATA_MANIFEST'|'DATA_ROWS'
)

【2】サンプル

例1:

import boto3
import time
import pandas as pd

REGION = 'us-west-2'
BUCKET_NAME = 'your-s3-bucket1'
OUTPUT_BUCKET = 'your-s3-bucket2'
DATABASE = 'demo_db'
TABLE_NAME = 'demo_table' 
COLNAMES = ['id', 'name', 'age']

QUERY = f"SELECT {','.join(COLNAMES)} FROM {TABLE_NAME};"

client = boto3.client('athena', region_name=REGION)

# 1. start_query_execution
response = client.start_query_execution(
    QueryString=QUERY,
    QueryExecutionContext={
        'Database': DATABASE
    },
    ResultConfiguration={
        'OutputLocation': f's3://{OUTPUT_BUCKET}/athena-query-results/'
    }
)
query_execution_id = response['QueryExecutionId']

# 2. get_query_execution
while True:
    response = client.get_query_execution(QueryExecutionId=query_execution_id)
 
   status = response['QueryExecution']['Status']['State']
    if status == 'SUCCEEDED':
        break
    elif status == 'FAILED' or status == 'CANCELLED':
        raise Exception(f"Query failed or was cancelled: {response}")
    else:
        time.sleep(0.5)

# 3. get_query_result
result_resoponse = client.get_query_results(
    QueryExecutionId=query_execution_id
)
header = [col.get('VarCharValue', None) for col in result_resoponse['ResultSet']['Rows'][0]['Data']]
data = [[col.get('VarCharValue', None) for col in row['Data']]
        for row in result_resoponse['ResultSet']['Rows'][1:]]

df_result = pd.DataFrame(data, columns=header)
print(df_result.head())

参考文献

https://qiita.com/c60evaporator/items/2695f0ceeccbbda1bac9

関連記事

Athena ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2020/06/17/173717
Athena ~ AWS CLI
https://dk521123.hatenablog.com/entry/2023/10/13/144254
dbt-athena ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2025/11/14/152128