■ はじめに
テーブル内のデータをアンロードして テストデータを作るって作業をしていて 大分、久しぶりに、 Snowflakeスクリプトを書いたのだが 仕様とか色々大幅にかかりそうな予感を感じているので Pythonコード化して対応しておく
【1】前提条件
* ストレージ統合(STORAGE INTEGRATION)は事前に作成しておくこと => 以下の関連記事を参照のこと
Snowflake ~ ストレージ統合の作成手順 ~
https://dk521123.hatenablog.com/entry/2023/07/27/000000
【1】サンプル
import codecs output_file = "unload.sql" base_db = "demo_db" base_schema = "demo_schema" start_date = "2023-08-01" end_date = "2023-08-03" target_s3_bucket_name = "your-s3-bucket" target_integration = "storage_integration_for_unload" target_unload_list = ( ("demo_db1", "demo_sche1", "demo_table1", "'unload_demo_table1' || target_date || '_.txt'", "|", True), ("demo_db2", "demo_sche2", "demo_table2", "'unload_demo_table2' || target_date || '_.txt'", ",", False), ) def build_sql(sql: str, target: str = ""): return sql + target + "\n" # Start to generate SQL sql = "" sql = build_sql(sql, f"use database {base_db};") sql = build_sql(sql, f"use schema {base_schema};") sql = build_sql(sql) sql = build_sql(sql, "-- Step1: Create each stage of table to unload") for i, target_table_info in enumerate(target_unload_list): number = i + 1 target_db_name = target_table_info[0] target_schema_name = target_table_info[1] target_table_name = target_table_info[2] target_table = f"{target_db_name}.{target_schema_name}.{target_table_name}" print(f"target_table = {target_table}") sql = build_sql(sql, f"-- [{number}] {target_table}") sql = build_sql(sql, f"CREATE OR REPLACE STAGE {base_db}.{base_schema}.{target_table_name}_unload_stage") sql = build_sql(sql, f" URL='s3://{target_s3_bucket_name}/demo/{target_db_name}/{target_schema_name}/{target_table_name}/'") sql = build_sql(sql, f" STORAGE_INTEGRATION={target_integration};") sql = build_sql(sql) sql = build_sql(sql, "execute immediate $$") sql = build_sql(sql, "declare") sql = build_sql(sql, f" start_date date default date('{start_date}');") sql = build_sql(sql, f" end_date date default date('{end_date}');") sql = build_sql(sql, f" final_statement varchar default '';") sql = build_sql(sql, "begin") sql = build_sql(sql, " let target_date := start_date;") sql = build_sql(sql, " while (target_date <= end_date) do") sql = build_sql(sql, " let year := to_varchar(:target_date, 'yyyy');") sql = build_sql(sql, " let month := to_varchar(:target_date, 'MM');") sql = build_sql(sql, " let day := to_varchar(:target_date, 'dd');") sql = build_sql(sql, " let target_date_as_int := to_char(:target_date, 'YYYYMMDD')::int;") for i, target_table_info in enumerate(target_unload_list): number = i + 1 target_db_name = target_table_info[0] target_schema_name = target_table_info[1] target_table_name = target_table_info[2] target_file_name = target_table_info[3] target_delimiter = target_table_info[4] has_header = target_table_info[5] target_table = f"{target_db_name}.{target_schema_name}.{target_table_name}" sql = build_sql(sql, f" -- ******* [{number}] {target_table} *******") sql = build_sql(sql, f" -- Step2-{number}: Clean data file path if exists") sql = build_sql(sql, f" let to_remove_file_{target_table_name} := 'REMOVE @{base_db}.{base_schema}.{target_table_name}_unload_stage/taget_date=' || target_date || '/';") sql = build_sql(sql, f" final_statement := to_remove_file_{target_table_name};") sql = build_sql(sql, f" EXECUTE IMMEDIATE :to_remove_file_{target_table_name};") sql = build_sql(sql, f" -- Step3-{number}: Unload") sql = build_sql(sql, f" let to_unload_file_{target_table_name} := 'COPY INTO @{base_db}.{base_schema}.{target_table_name}_unload_stage/taget_date=' || target_date || '/' || {target_file_name} ||") sql = build_sql(sql, f" 'FROM (' ||") sql = build_sql(sql, f" ' SELECT * FROM {target_table} WHERE insertion_date=' || target_date ||") sql = build_sql(sql, f" ')' ||") sql = build_sql(sql, f" 'FILE_FORMAT = (' ||") sql = build_sql(sql, f" ' TYPE=CSV' ||") sql = build_sql(sql, f" ' FIELD_DELIMITER = \\\'{target_delimiter}\\\'' ||") sql = build_sql(sql, f" ' FIELD_OPTIONALLY_ENCLOSED_BY = \\\'\\\"\\\'' ||") sql = build_sql(sql, f" ' NULL_IF=(\\\'\\\') ||") sql = build_sql(sql, f" ' COMPRESSION = NONE' ||") sql = build_sql(sql, f" ')' ||") if has_header: sql = build_sql(sql, f" 'HEADER = TRUE' ||") sql = build_sql(sql, f" 'SINGLE=TRUE';") sql = build_sql(sql, f" final_statement := to_unload_file_{target_table_name};") sql = build_sql(sql, f" EXECUTE IMMEDIATE :to_unload_file_{target_table_name};") sql = build_sql(sql) sql = build_sql(sql, " -- loop end") sql = build_sql(sql, " target_date := target_date + interval '1 days';") sql = build_sql(sql, " end while;") sql = build_sql(sql, " return final_statement;") sql = build_sql(sql, "end;") sql = build_sql(sql, "$$") sql = build_sql(sql, ";") with codecs.open(output_file, 'w', 'utf-8') as file: file.write(sql) print(f"Done. See {output_file}")
出力例
use database demo_db; use schema demo_schema; -- Step1: Create each stage of table to unload -- [1] demo_db1.demo_sche1.demo_table1 CREATE OR REPLACE STAGE demo_db.demo_schema.demo_table1_unload_stage URL='s3://your-s3-bucket/demo/demo_db1/demo_sche1/demo_table1/' STORAGE_INTEGRATION=storage_integration_for_unload; -- [2] demo_db2.demo_sche2.demo_table2 CREATE OR REPLACE STAGE demo_db.demo_schema.demo_table2_unload_stage URL='s3://your-s3-bucket/demo/demo_db2/demo_sche2/demo_table2/' STORAGE_INTEGRATION=storage_integration_for_unload; execute immediate $$ declare start_date date default date('2023-08-01'); end_date date default date('2023-08-03'); final_statement varchar default ''; begin let target_date := start_date; while (target_date <= end_date) do let year := to_varchar(:target_date, 'yyyy'); let month := to_varchar(:target_date, 'MM'); let day := to_varchar(:target_date, 'dd'); let target_date_as_int := to_char(:target_date, 'YYYYMMDD')::int; -- ******* [1] demo_db1.demo_sche1.demo_table1 ******* -- Step2-1: Clean data file path if exists let to_remove_file_demo_table1 := 'REMOVE @demo_db.demo_schema.demo_table1_unload_stage/taget_date=' || target_date || '/'; final_statement := to_remove_file_demo_table1; EXECUTE IMMEDIATE :to_remove_file_demo_table1; -- Step3-1: Unload let to_unload_file_demo_table1 := 'COPY INTO @demo_db.demo_schema.demo_table1_unload_stage/taget_date=' || target_date || '/' || 'unload_demo_table1' || target_date || '_.txt' || 'FROM (' || ' SELECT * FROM demo_db1.demo_sche1.demo_table1 WHERE insertion_date=' || target_date || ')' || 'FILE_FORMAT = (' || ' TYPE=CSV' || ' FIELD_DELIMITER = \'|\'' || ' FIELD_OPTIONALLY_ENCLOSED_BY = \'\"\'' || ' NULL_IF=(\'\') || ' COMPRESSION = NONE' || ')' || 'HEADER = TRUE' || 'SINGLE=TRUE'; final_statement := to_unload_file_demo_table1; EXECUTE IMMEDIATE :to_unload_file_demo_table1; -- ******* [2] demo_db2.demo_sche2.demo_table2 ******* -- Step2-2: Clean data file path if exists let to_remove_file_demo_table2 := 'REMOVE @demo_db.demo_schema.demo_table2_unload_stage/taget_date=' || target_date || '/'; final_statement := to_remove_file_demo_table2; EXECUTE IMMEDIATE :to_remove_file_demo_table2; -- Step3-2: Unload let to_unload_file_demo_table2 := 'COPY INTO @demo_db.demo_schema.demo_table2_unload_stage/taget_date=' || target_date || '/' || 'unload_demo_table2' || target_date || '_.txt' || 'FROM (' || ' SELECT * FROM demo_db2.demo_sche2.demo_table2 WHERE insertion_date=' || target_date || ')' || 'FILE_FORMAT = (' || ' TYPE=CSV' || ' FIELD_DELIMITER = \',\'' || ' FIELD_OPTIONALLY_ENCLOSED_BY = \'\"\'' || ' NULL_IF=(\'\') || ' COMPRESSION = NONE' || ')' || 'SINGLE=TRUE'; final_statement := to_unload_file_demo_table2; EXECUTE IMMEDIATE :to_unload_file_demo_table2; -- loop end target_date := target_date + interval '1 days'; end while; return final_statement; end; $$ ;
関連記事
Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111
Snowflake ~ 入門編 / Hello world ~
https://dk521123.hatenablog.com/entry/2021/11/22/212520
Snowflake ~ データ アンロード ~
https://dk521123.hatenablog.com/entry/2022/07/04/172738
Snowflake ~ Removeコマンド ~
https://dk521123.hatenablog.com/entry/2022/09/26/150259
Snowflake ~ ストレージ統合の作成手順 ~
https://dk521123.hatenablog.com/entry/2023/07/27/000000
Snowflake ~ 日時関連 ~
https://dk521123.hatenablog.com/entry/2022/06/17/113003
Snowflake ~ 日時関連 / 日時取得関数 ~
https://dk521123.hatenablog.com/entry/2022/09/02/092144
Snowflake ~ 文字列操作関連 ~
https://dk521123.hatenablog.com/entry/2022/10/01/000000
Snowflake ~ SEQUENCE ~
https://dk521123.hatenablog.com/entry/2023/07/14/091918
【Snowflake】ストアド ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2022/12/11/202904
【Snowflake】ストアド ~ 基本編 / ループ ~
https://dk521123.hatenablog.com/entry/2022/12/16/143349
ファイル読込・書込
https://dk521123.hatenablog.com/entry/2019/10/07/000000