【Snowflake】Snowflake ~ Pythonでクエリ実行 ~

■ はじめに

Python で Snowflake の 非同期クエリを実行することになったので
これを機にPython用Snowflakeコネクタを学んでみる

目次

【1】Python用Snowflakeコネクタ
【2】環境設定
【3】同期クエリ実行
 1)サンプル
 2)関連するAPI
【4】非同期クエリ実行
 1)サンプル
 2)関連するAPI

【1】PythonSnowflakeコネクタ

* Snowflakeに接続するためのPython用インターフェイス

https://docs.snowflake.com/ja/developer-guide/python-connector/python-connector

【2】環境設定

pip install snowflake-connector-python

https://docs.snowflake.com/ja/developer-guide/python-connector/python-connector-install
使用上の注意

* Pythonバージョン3.7以降が必要

【3】同期クエリ実行

https://docs.snowflake.com/ja/developer-guide/python-connector/python-connector-example#performing-a-synchronous-query

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')

1)サンプル

import snowflake.connector

with snowflake.connector.connect(
    user="user",
    password="password",
    account="account",
    warehouse="warehouse",
    database="database",
    schema="schema",
    role="role",
)  as conn:
  with conn.cursor() as cursor:
    target_id = 'x0001'
    cursor.execute('SELECT * FROM person WHERE id = ? LIMIT 3', target_id)
    for row in cursor:
      print(type(row))
      id, name, age = row
      print(f'id={id}, name={name}, age={age}')

2)関連するAPI

cursor.execute

* 同期クエリ実行

execute_string(sql_text, remove_comments=False, return_cursors=True)

* 1つ以上の SQLを実行

https://docs.snowflake.com/ja/developer-guide/python-connector/python-connector-api#object-connection

cursor_list = connection1.execute_string(
    "SELECT * FROM testtable WHERE col1 LIKE 'T%';"
    "SELECT * FROM testtable WHERE col2 LIKE 'A%';"
    )

for cursor in cursor_list:
   for row in cursor:
      print(row[0], row[1])

【3】非同期クエリ実行

https://docs.snowflake.com/ja/developer-guide/python-connector/python-connector-example#performing-an-asynchronous-query

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

1)サンプル

import snowflake.connector
snowflake.connector import DictCursor

with snowflake.connector.connect(
    user="user",
    password="password",
    account="account",
    warehouse="warehouse",
    database="database",
    schema="schema",
    role="role",
)  as conn:
  with conn.cursor(cursor_class=DictCursor) as cursor:
    cursor.execute_async()
    # クエリIDを取得
    query_id = cursor.sfqid
    while con.is_still_running(con.get_query_status_throw_if_error(query_id)):
      print('waiting query results...')
      time.sleep(1)
      
    cur.get_results_from_sfqid(query_id)
    print('recieved query results.')
    results = cur.fetchall()
    print(results)

2)関連するAPI

execute_async(...)

* 非同期クエリ実行

get_query_status_throw_if_error(query_id)

* クエリのステータスを返す

is_still_running(query_status)

* まだ処理中であるか(処理中はTrue)

is_an_error(query_status)

* エラーが発生したか(処理中はTrue)

get_results_from_sfqid(query_id)

* 非同期クエリまたは以前に送信された同期クエリの結果を取得

参考文献

https://dev.classmethod.jp/articles/try-snowflake-connector-for-python-async-query/
https://www.estie.jp/blog/entry/2023/04/21/160000
https://blog.serverworks.co.jp/tech/2020/06/05/snowflake-python-connector/

関連記事

Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111