【Snowflake】ストアド ~ ステージ内でデータ0件の場合エラーにする ~

■ はじめに

 ステージからS3にファイルを読み込んで、
そのファイルデータをSnowflakeにロードするって処理がある。
その際にファイルがなかったら、意図的にエラーにして処理を
中断させることを考える
(なんかオプションがあればいいんだけど、探したけどなかった。)

目次

【1】ポイント
【2】サンプル
 1)テストデータ「test.csv」
 2)フォルダ構成例
 3)SQL例
【3】汎用的に使うためにストアド化する
 1)SQL例
【4】補足:ステージのオプション
 1)ON_ERROR
 2)SIZE_LIMIT = num

【1】ポイント

正直、以前やった

https://dk521123.hatenablog.com/entry/2022/12/23/223345

のサンプルで9割ほど完成しているのだが、ポイントをあげておく。
~~~~~~~~~~~~~
[1] ステージしたデータが0件だったら、raise で例外を発生させる
[2] その際に、EXCEPTION ブロックでキャッチしない
~~~~~~~~~~~~~
 => 使い回したい場合は、ストアド関数化すればいい
 (「【3】汎用的に使うためにストアド化する」を参照のこと)

【2】サンプル

1)テストデータ「test.csv

"id","name","age"
"X001","Mike","20"
"X002","Kevin","32"
"X003","Sam","78"

2)フォルダ構成例

s3://your-s3-bucket-name/test/
 date=20221229/ <= Fileあり(エラーを出さずにLoad処理したい)
   test.csv
 date=20221230/ <= Fileなし(エラーを出したい)

3)SQL

use database your_db;
use schema your_schema;

-- ステージ (URLを切り替えてテスト)
CREATE OR REPLACE TEMPORARY STAGE demo_stage
  URL = 's3://your-s3-bucket-name/test/date=20221230/'
  -- URL = 's3://your-s3-bucket-name/test/date=20221219/'
  STORAGE_INTEGRATION = xxxxx
  FILE_FORMAT = (TYPE=CSV SKIP_HEADER=1 FILE NULL_IF=('')
  -- FILE_FORMAT = (TYPE=CSV FILE NULL_IF=('') -- ファイルが全くの空をチェックする場合
;

-- 件数チェック処理
execute immediate $$
declare
  row_count integer default 0;
  zero_count_exception exception (-20404, 'No data');
begin
  let result_set resultset := (select count(*) as row_count from @demo_stage);
  let cursor_for_result_set cursor for result_set;
  for row_variable in cursor_for_result_set do
    let row_count number := row_variable.row_count;
    if (row_count <= 0) then
      raise zero_count_exception; -- データ0件の場合エラーにする
    else
      return row_count;
    end if;
  end for;
end;
$$
;

-- 以降は、ロード処理

-- ロード用テーブル作成
CREATE TABLE test_table IF NOT EXISTS (
  id VARCHAR,
  name VARCHAR,
  age NUMBER
);

-- ロード処理
COPY INTO test_table FROM
(
  SELECT
    $1,
    $2,
    $3 
  FROM
    @demo_stage
);

【3】汎用的に使うためにストアド化する

* 公式ドキュメントの以下を参考に
 「【2】サンプル」の「3)SQL例」を
 汎用的に使うためにストアド化する

https://docs.snowflake.com/ja/developer-guide/snowflake-scripting/resultsets.html#example-constructing-the-sql-statement-dynamically

クエリに対して SQL の文字列を動的に作成する必要がある場合は、 
query を (EXECUTE IMMEDIATE string_of_sql) に設定します。

1)SQL

* create procedure の構文は、以下の公式ドキュメントを参照

https://docs.snowflake.com/ja/sql-reference/sql/create-procedure.html

1) ストアド例

create or replace procedure validate_existing_data(table_name varchar)
returns number
language sql
execute as caller
comment  = 'To check existing data. if no data, raise zero_count_exception exception.'
as
$$
declare
  count_query varchar default 'select count(*) as row_count from ' || :table_name || ';';
  row_count integer default 0;
  zero_count_exception exception (-20404, 'No data');
begin
  let result_set resultset := (execute immediate :count_query);
  let cursor_for_result_set cursor for result_set;
  for row_variable in cursor_for_result_set do
    let row_count number := row_variable.row_count;
    if (row_count <= 0) then
      raise zero_count_exception; -- データ0件の場合エラーにする
    else
      return row_count;
    end if;
  end for;
end;
$$
;

呼び出し例

-- 件数チェック処理
call validate_existing_data('@demo_stage');

【4】補足:ステージのオプション

冒頭で軽く述べた通り、
初め、オプションでエラーを発生できないかなっと調査していた。
で、結局、そういうオプションは見当たらなったが、
探している時に面白そうなオプションがあったので、メモ。

https://docs.snowflake.com/ja/sql-reference/sql/create-stage.html#copy-options-copyoptions

1)ON_ERROR

* ロード操作のエラー処理を指定する
 => こちらは、エラーになった場合の操作
# 設定値 説明 メモ
1 CONTINUE エラーが見つかってもロード続行
2 SKIP_FILE エラーだったらファイルをスキップ Snowpipe時のデフォルト
3 SKIP_FILE_[num] エラー行数が指定以上の場合、ファイルをスキップ 例: SKIP_FILE_10
4 'SKIP_FILE_[num]%' エラー行数の割合が指定以上の場合、ファイルをスキップ 例: 'SKIP_FILE_10%'
5 ABORT_STATEMENT エラーが見つかった場合は、ロード操作を中止 デフォルト

2)SIZE_LIMIT = num

特定の COPY ステートメントに対して
ロードされるデータの最大サイズ(バイト単位)を指定する数値(> 0)。
しきい値を超えると、 COPY 操作はファイルのロードを中止します。

関連記事

Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111
Snowflake ~ 入門編 / Hello world
https://dk521123.hatenablog.com/entry/2021/11/22/212520
Snowflake】ストアド ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2022/12/11/202904
Snowflake】ストアド ~ 基本編 / ループ ~
https://dk521123.hatenablog.com/entry/2022/12/16/143349
Snowflake】ストアド ~ 基本編 / 条件分岐 ~
https://dk521123.hatenablog.com/entry/2022/12/26/211349
Snowflake】ストアド ~ 例外 / EXCEPTION ~
https://dk521123.hatenablog.com/entry/2022/12/23/223345
Snowflake】ストアド ~ EXECUTE IMMEDIATE ~
https://dk521123.hatenablog.com/entry/2022/12/17/000217
Snowflake】ストアド ~ デバッグログについて ~
https://dk521123.hatenablog.com/entry/2022/12/18/121334
Snowflake】ストアド ~ S3内にパーティション構成でUnloadする ~
https://dk521123.hatenablog.com/entry/2022/12/27/225629