【Snowflake】ストアド ~ S3内にパーティション構成でUnloadする ~

■ はじめに

https://dk521123.hatenablog.com/entry/2022/12/11/202904
https://dk521123.hatenablog.com/entry/2022/12/16/143349
https://dk521123.hatenablog.com/entry/2022/12/26/211349

などで、何回か渡って、
Snowflake の ストアドプロシージャについて扱った。

 今回は、これらの知識を使って、
S3内にパーティション構成でUnloadすることを行う。

目次

【1】今回やりたいこと
【2】ポイント
【3】サンプル
 1)使用するテーブルおよびテストデータ
 2)ストアド例

【1】今回やりたいこと

* Snowflake 内に格納されているテーブルデータをS3にUnloadする
 => その際、AWS Athena などを使うことを想定して
  パーティション構成(.../partition_date=yyyyMMdd/country_code=XX/)で出力する

* 冪等性を担保するために、Removeで以前のデータを削除する

【2】ポイント

[1] 実行するSQL文を文字列をJOIN(||)して作成
 => その際、「'」は途中改行があっても改行を含めて文字列化できる
 => 実行するSQL文に「'」がある場合は、「\」でエスケープする

[2] [1] のSQL文をEXECUTE IMMEDIATE使って実行する
 => EXECUTE IMMEDIATE文については、以下の関連記事を参照。

https://dk521123.hatenablog.com/entry/2022/12/17/000217

参考にしたサイトのSQL
https://community.snowflake.com/s/question/0D53r0000Bwfh3rCQA/unable-to-pass-variable-in-the-file-pattern-section-of-the-copy-command

-- より抜粋

create or replace procedure proc()
returns varchar(16777216)
language SQL
EXECUTE AS OWNER
AS $$
DECLARE
file_name varchar;
BEGIN
file_name := '.*[.]gz';
 
LET sql_string varchar := 'COPY INTO MY_TABLE
  FROM @tmp
  file_format=(type=csv field_optionally_enclosed_by=\'"\')
  pattern= \'' || :file_name || '\'';
 
execute immediate :sql_string;
return sql_string;
END
$$;
 
call proc();

【3】サンプル

* 可読性が物凄く悪いが、ひとまず動くものはできた。。。

1)使用するテーブルおよびテストデータ

-- DB作成 (不要ならSkip)
create database your_db;
-- Schema 作成 (不要ならSkip)
create schema your_schema;
-- 使用テーブル
create or replace table demo_table (
  id varchar,
  name varchar,
  country_code varchar,
  insertion_date date
);

insert into demo_table (
  id, name, country_code, insertion_date) values
    -- 2022-03-01
    ('X0001', 'Mike', 'US', '2022-02-28'),
    ('X0002', 'Naomi', 'JP', '2022-02-28'),
    ('X0003', 'Tom', 'US', '2022-02-28'),
    ('X0004', 'Sam', 'UK', '2022-02-28'),
    ('X0005', 'Kevin', 'UK', '2022-02-28'),
    -- 2022-03-01
    ('X0011', 'Noah', 'US', '2022-03-01'),
    ('X0012', 'Oliver', 'AU', '2022-03-01'),
    ('X0013', 'Lucas', 'JP', '2022-03-01'),
    ('X0014', 'Ethan', 'US', '2022-03-01'),
    ('X0015', 'James', 'UK', '2022-03-01'),
    -- 2022-03-02
    ('X0021', 'Benjamin', 'US', '2022-03-02'),
    ('X0022', 'Ken', 'JP', '2022-03-02'),
    ('X0023', 'Smith', 'UK', '2022-03-02')
;

2)ストアド例

use database your_db;
use schema your_schema;

CREATE OR REPLACE TEMPORARY STAGE your_db.your_schema.demo_temp_stage
  URL = 's3://your-s3-bucket-name/demo_warehouse/demo_db/demo_table/'
  STORAGE_INTEGRATION = xxxxx
;

execute immediate $$
declare
  last_query_for_debug varchar default 'Done';
  start_date date default date('2022-02-28');
  end_date date default date('2022-03-02');
begin
begin
  let target_date := start_date;

  -- WHILE Loop (end_dateまでループする)
  while (target_date <= end_date) do
    -- To Remove
    let remove_statement varchar := 'remove @your_db.your_schema.demo_temp_stage/partition_date=' || target_date || '/';
    -- 実行
    last_query_for_debug := remove_statement;
    execute immediate :remove_statement;

    -- country_code での Loop
    let result_set resultset := (select country_code from demo_table where insertion_date= :target_date group by country_code);
    let cursor_for_result_set cursor for result_set;
    for row_variable in cursor_for_result_set do
      -- To Copy into for unloading (凄く可読性が悪いが、、、)
      let copy_into_statement varchar := 'copy into @your_db.your_schema.demo_temp_stage/partition_date=' || target_date || '/country_code=' || row_variable.country_code || '/' ||
        ' from (
           select * from demo_table where insertion_date=\'' || target_date || '\' and country_code=\'' || row_variable.country_code || '\'' ||
        ' )
        file_format = ( type=csv, field_delimiter=\'\\t\', compression = gzip, field_optionally_enclosed_by=\'"\', null_if = (\'\') )
        header=true
        max_file_size=4900000000;';

      -- 実行
      last_query_for_debug  := copy_into_statement;
      execute immediate :copy_into_statement;
    end for;

    target_date := target_date + interval '1 days';
  end while;
  return last_query_for_debug;
end;
$$
;

関連記事

Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111
Snowflake ~ 入門編 / Hello world
https://dk521123.hatenablog.com/entry/2021/11/22/212520
Snowflake】ストアド ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2022/12/11/202904
Snowflake】ストアド ~ 基本編 / ループ ~
https://dk521123.hatenablog.com/entry/2022/12/16/143349
Snowflake】ストアド ~ 基本編 / 条件分岐 ~
https://dk521123.hatenablog.com/entry/2022/12/26/211349
Snowflake】ストアド ~ 例外 / EXCEPTION ~
https://dk521123.hatenablog.com/entry/2022/12/23/223345
Snowflake】ストアド ~ EXECUTE IMMEDIATE ~
https://dk521123.hatenablog.com/entry/2022/12/17/000217
Snowflake】ストアド ~ デバッグログについて ~
https://dk521123.hatenablog.com/entry/2022/12/18/121334
Snowflake】ストアド ~ ステージ内でデータ0件の場合エラーにする ~
https://dk521123.hatenablog.com/entry/2022/12/29/175848