■ はじめに
Python で Snowflake の 非同期クエリを実行することになったので これを機にPython用Snowflakeコネクタを学んでみる
目次
【1】Python用Snowflakeコネクタ 【2】環境設定 【3】同期クエリ実行 1)サンプル 2)関連するAPI 【4】非同期クエリ実行 1)サンプル 2)関連するAPI
【1】Python用Snowflakeコネクタ
* Snowflakeに接続するためのPython用インターフェイス
https://docs.snowflake.com/ja/developer-guide/python-connector/python-connector
【2】環境設定
pip install snowflake-connector-python
https://docs.snowflake.com/ja/developer-guide/python-connector/python-connector-install
使用上の注意
* Pythonバージョン3.7以降が必要
【3】同期クエリ実行
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')
1)サンプル
import snowflake.connector with snowflake.connector.connect( user="user", password="password", account="account", warehouse="warehouse", database="database", schema="schema", role="role", ) as conn: with conn.cursor() as cursor: target_id = 'x0001' cursor.execute('SELECT * FROM person WHERE id = ? LIMIT 3', target_id) for row in cursor: print(type(row)) id, name, age = row print(f'id={id}, name={name}, age={age}')
2)関連するAPI
cursor.execute
* 同期クエリ実行
execute_string(sql_text, remove_comments=False, return_cursors=True)
* 1つ以上の SQLを実行
cursor_list = connection1.execute_string( "SELECT * FROM testtable WHERE col1 LIKE 'T%';" "SELECT * FROM testtable WHERE col2 LIKE 'A%';" ) for cursor in cursor_list: for row in cursor: print(row[0], row[1])
【3】非同期クエリ実行
conn = snowflake.connector.connect( ... ) cur = conn.cursor() # Submit an asynchronous query for execution. cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
1)サンプル
import snowflake.connector snowflake.connector import DictCursor with snowflake.connector.connect( user="user", password="password", account="account", warehouse="warehouse", database="database", schema="schema", role="role", ) as conn: with conn.cursor(cursor_class=DictCursor) as cursor: cursor.execute_async() # クエリIDを取得 query_id = cursor.sfqid while con.is_still_running(con.get_query_status_throw_if_error(query_id)): print('waiting query results...') time.sleep(1) cur.get_results_from_sfqid(query_id) print('recieved query results.') results = cur.fetchall() print(results)
2)関連するAPI
execute_async(...)
* 非同期クエリ実行
get_query_status_throw_if_error(query_id)
* クエリのステータスを返す
is_still_running(query_status)
* まだ処理中であるか(処理中はTrue)
is_an_error(query_status)
* エラーが発生したか(処理中はTrue)
get_results_from_sfqid(query_id)
* 非同期クエリまたは以前に送信された同期クエリの結果を取得
参考文献
https://dev.classmethod.jp/articles/try-snowflake-connector-for-python-async-query/
https://www.estie.jp/blog/entry/2023/04/21/160000
https://blog.serverworks.co.jp/tech/2020/06/05/snowflake-python-connector/
関連記事
Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111