■ はじめに
https://dk521123.hatenablog.com/entry/2022/12/11/202904
https://dk521123.hatenablog.com/entry/2022/12/16/143349
https://dk521123.hatenablog.com/entry/2022/12/26/211349
などで、何回か渡って、 Snowflake の ストアドプロシージャについて扱った。 今回は、これらの知識を使って、 S3内にパーティション構成でUnloadすることを行う。
目次
【1】今回やりたいこと 【2】ポイント 【3】サンプル 1)使用するテーブルおよびテストデータ 2)ストアド例
【1】今回やりたいこと
* Snowflake 内に格納されているテーブルデータをS3にUnloadする => その際、AWS Athena などを使うことを想定して パーティション構成(.../partition_date=yyyyMMdd/country_code=XX/)で出力する * 冪等性を担保するために、Removeで以前のデータを削除する
【2】ポイント
[1] 実行するSQL文を文字列をJOIN(||)して作成 => その際、「'」は途中改行があっても改行を含めて文字列化できる => 実行するSQL文に「'」がある場合は、「\」でエスケープする [2] [1] のSQL文をEXECUTE IMMEDIATE使って実行する => EXECUTE IMMEDIATE文については、以下の関連記事を参照。
https://dk521123.hatenablog.com/entry/2022/12/17/000217
参考にしたサイトのSQL文
https://community.snowflake.com/s/question/0D53r0000Bwfh3rCQA/unable-to-pass-variable-in-the-file-pattern-section-of-the-copy-command
-- より抜粋 create or replace procedure proc() returns varchar(16777216) language SQL EXECUTE AS OWNER AS $$ DECLARE file_name varchar; BEGIN file_name := '.*[.]gz'; LET sql_string varchar := 'COPY INTO MY_TABLE FROM @tmp file_format=(type=csv field_optionally_enclosed_by=\'"\') pattern= \'' || :file_name || '\''; execute immediate :sql_string; return sql_string; END $$; call proc();
【3】サンプル
* 可読性が物凄く悪いが、ひとまず動くものはできた。。。
1)使用するテーブルおよびテストデータ
-- DB作成 (不要ならSkip) create database your_db; -- Schema 作成 (不要ならSkip) create schema your_schema; -- 使用テーブル create or replace table demo_table ( id varchar, name varchar, country_code varchar, insertion_date date ); insert into demo_table ( id, name, country_code, insertion_date) values -- 2022-03-01 ('X0001', 'Mike', 'US', '2022-02-28'), ('X0002', 'Naomi', 'JP', '2022-02-28'), ('X0003', 'Tom', 'US', '2022-02-28'), ('X0004', 'Sam', 'UK', '2022-02-28'), ('X0005', 'Kevin', 'UK', '2022-02-28'), -- 2022-03-01 ('X0011', 'Noah', 'US', '2022-03-01'), ('X0012', 'Oliver', 'AU', '2022-03-01'), ('X0013', 'Lucas', 'JP', '2022-03-01'), ('X0014', 'Ethan', 'US', '2022-03-01'), ('X0015', 'James', 'UK', '2022-03-01'), -- 2022-03-02 ('X0021', 'Benjamin', 'US', '2022-03-02'), ('X0022', 'Ken', 'JP', '2022-03-02'), ('X0023', 'Smith', 'UK', '2022-03-02') ;
2)ストアド例
use database your_db; use schema your_schema; CREATE OR REPLACE TEMPORARY STAGE your_db.your_schema.demo_temp_stage URL = 's3://your-s3-bucket-name/demo_warehouse/demo_db/demo_table/' STORAGE_INTEGRATION = xxxxx ; execute immediate $$ declare last_query_for_debug varchar default 'Done'; start_date date default date('2022-02-28'); end_date date default date('2022-03-02'); begin begin let target_date := start_date; -- WHILE Loop (end_dateまでループする) while (target_date <= end_date) do -- To Remove let remove_statement varchar := 'remove @your_db.your_schema.demo_temp_stage/partition_date=' || target_date || '/'; -- 実行 last_query_for_debug := remove_statement; execute immediate :remove_statement; -- country_code での Loop let result_set resultset := (select country_code from demo_table where insertion_date= :target_date group by country_code); let cursor_for_result_set cursor for result_set; for row_variable in cursor_for_result_set do -- To Copy into for unloading (凄く可読性が悪いが、、、) let copy_into_statement varchar := 'copy into @your_db.your_schema.demo_temp_stage/partition_date=' || target_date || '/country_code=' || row_variable.country_code || '/' || ' from ( select * from demo_table where insertion_date=\'' || target_date || '\' and country_code=\'' || row_variable.country_code || '\'' || ' ) file_format = ( type=csv, field_delimiter=\'\\t\', compression = gzip, field_optionally_enclosed_by=\'"\', null_if = (\'\') ) header=true max_file_size=4900000000;'; -- 実行 last_query_for_debug := copy_into_statement; execute immediate :copy_into_statement; end for; target_date := target_date + interval '1 days'; end while; return last_query_for_debug; end; $$ ;
関連記事
Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111
Snowflake ~ 入門編 / Hello world ~
https://dk521123.hatenablog.com/entry/2021/11/22/212520
【Snowflake】ストアド ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2022/12/11/202904
【Snowflake】ストアド ~ 基本編 / ループ ~
https://dk521123.hatenablog.com/entry/2022/12/16/143349
【Snowflake】ストアド ~ 基本編 / 条件分岐 ~
https://dk521123.hatenablog.com/entry/2022/12/26/211349
【Snowflake】ストアド ~ 例外 / EXCEPTION ~
https://dk521123.hatenablog.com/entry/2022/12/23/223345
【Snowflake】ストアド ~ EXECUTE IMMEDIATE ~
https://dk521123.hatenablog.com/entry/2022/12/17/000217
【Snowflake】ストアド ~ デバッグログについて ~
https://dk521123.hatenablog.com/entry/2022/12/18/121334
【Snowflake】ストアド ~ ステージ内でデータ0件の場合エラーにする ~
https://dk521123.hatenablog.com/entry/2022/12/29/175848