■ はじめに
ステージからS3にファイルを読み込んで、 そのファイルデータをSnowflakeにロードするって処理がある。 その際にファイルがなかったら、意図的にエラーにして処理を 中断させることを考える (なんかオプションがあればいいんだけど、探したけどなかった。)
目次
【1】ポイント 【2】サンプル 1)テストデータ「test.csv」 2)フォルダ構成例 3)SQL例 【3】汎用的に使うためにストアド化する 1)SQL例 【4】補足:ステージのオプション 1)ON_ERROR 2)SIZE_LIMIT = num
【1】ポイント
正直、以前やった
https://dk521123.hatenablog.com/entry/2022/12/23/223345
のサンプルで9割ほど完成しているのだが、ポイントをあげておく。 ~~~~~~~~~~~~~ [1] ステージしたデータが0件だったら、raise で例外を発生させる [2] その際に、EXCEPTION ブロックでキャッチしない ~~~~~~~~~~~~~ => 使い回したい場合は、ストアド関数化すればいい (「【3】汎用的に使うためにストアド化する」を参照のこと)
【2】サンプル
1)テストデータ「test.csv」
"id","name","age" "X001","Mike","20" "X002","Kevin","32" "X003","Sam","78"
2)フォルダ構成例
s3://your-s3-bucket-name/test/ date=20221229/ <= Fileあり(エラーを出さずにLoad処理したい) test.csv date=20221230/ <= Fileなし(エラーを出したい)
3)SQL例
use database your_db; use schema your_schema; -- ステージ (URLを切り替えてテスト) CREATE OR REPLACE TEMPORARY STAGE demo_stage URL = 's3://your-s3-bucket-name/test/date=20221230/' -- URL = 's3://your-s3-bucket-name/test/date=20221219/' STORAGE_INTEGRATION = xxxxx FILE_FORMAT = (TYPE=CSV SKIP_HEADER=1 FILE NULL_IF=('') -- FILE_FORMAT = (TYPE=CSV FILE NULL_IF=('') -- ファイルが全くの空をチェックする場合 ; -- 件数チェック処理 execute immediate $$ declare row_count integer default 0; zero_count_exception exception (-20404, 'No data'); begin let result_set resultset := (select count(*) as row_count from @demo_stage); let cursor_for_result_set cursor for result_set; for row_variable in cursor_for_result_set do let row_count number := row_variable.row_count; if (row_count <= 0) then raise zero_count_exception; -- データ0件の場合エラーにする else return row_count; end if; end for; end; $$ ; -- 以降は、ロード処理 -- ロード用テーブル作成 CREATE TABLE test_table IF NOT EXISTS ( id VARCHAR, name VARCHAR, age NUMBER ); -- ロード処理 COPY INTO test_table FROM ( SELECT $1, $2, $3 FROM @demo_stage );
【3】汎用的に使うためにストアド化する
* 公式ドキュメントの以下を参考に 「【2】サンプル」の「3)SQL例」を 汎用的に使うためにストアド化する
クエリに対して SQL の文字列を動的に作成する必要がある場合は、 query を (EXECUTE IMMEDIATE string_of_sql) に設定します。
1)SQL例
* create procedure の構文は、以下の公式ドキュメントを参照
https://docs.snowflake.com/ja/sql-reference/sql/create-procedure.html
1) ストアド例
create or replace procedure validate_existing_data(table_name varchar) returns number language sql execute as caller comment = 'To check existing data. if no data, raise zero_count_exception exception.' as $$ declare count_query varchar default 'select count(*) as row_count from ' || :table_name || ';'; row_count integer default 0; zero_count_exception exception (-20404, 'No data'); begin let result_set resultset := (execute immediate :count_query); let cursor_for_result_set cursor for result_set; for row_variable in cursor_for_result_set do let row_count number := row_variable.row_count; if (row_count <= 0) then raise zero_count_exception; -- データ0件の場合エラーにする else return row_count; end if; end for; end; $$ ;
呼び出し例
-- 件数チェック処理 call validate_existing_data('@demo_stage');
【4】補足:ステージのオプション
冒頭で軽く述べた通り、 初め、オプションでエラーを発生できないかなっと調査していた。 で、結局、そういうオプションは見当たらなったが、 探している時に面白そうなオプションがあったので、メモ。
https://docs.snowflake.com/ja/sql-reference/sql/create-stage.html#copy-options-copyoptions
1)ON_ERROR
* ロード操作のエラー処理を指定する => こちらは、エラーになった場合の操作
# | 設定値 | 説明 | メモ |
---|---|---|---|
1 | CONTINUE | エラーが見つかってもロード続行 | |
2 | SKIP_FILE | エラーだったらファイルをスキップ | Snowpipe時のデフォルト |
3 | SKIP_FILE_[num] | エラー行数が指定以上の場合、ファイルをスキップ | 例: SKIP_FILE_10 |
4 | 'SKIP_FILE_[num]%' | エラー行数の割合が指定以上の場合、ファイルをスキップ | 例: 'SKIP_FILE_10%' |
5 | ABORT_STATEMENT | エラーが見つかった場合は、ロード操作を中止 | デフォルト |
2)SIZE_LIMIT = num
特定の COPY ステートメントに対して ロードされるデータの最大サイズ(バイト単位)を指定する数値(> 0)。 しきい値を超えると、 COPY 操作はファイルのロードを中止します。
関連記事
Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111
Snowflake ~ 入門編 / Hello world ~
https://dk521123.hatenablog.com/entry/2021/11/22/212520
【Snowflake】ストアド ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2022/12/11/202904
【Snowflake】ストアド ~ 基本編 / ループ ~
https://dk521123.hatenablog.com/entry/2022/12/16/143349
【Snowflake】ストアド ~ 基本編 / 条件分岐 ~
https://dk521123.hatenablog.com/entry/2022/12/26/211349
【Snowflake】ストアド ~ 例外 / EXCEPTION ~
https://dk521123.hatenablog.com/entry/2022/12/23/223345
【Snowflake】ストアド ~ EXECUTE IMMEDIATE ~
https://dk521123.hatenablog.com/entry/2022/12/17/000217
【Snowflake】ストアド ~ デバッグログについて ~
https://dk521123.hatenablog.com/entry/2022/12/18/121334
【Snowflake】ストアド ~ S3内にパーティション構成でUnloadする ~
https://dk521123.hatenablog.com/entry/2022/12/27/225629