【Snowflake】Snowflake ~ Snowpipe ~

■ はじめに

仕事で、Snowflake の Snowpipe を試しそうなので
予習しておく

目次

【1】Snowpipe
 1)公式ドキュメント
【2】SQL文
 1)CREATE PIPE
 2)SHOW PIPES
【3】使用上の注意
 1)推奨ロードファイルサイズ
 2)日時関数の使用
 3)ファイルの削除
【4】Snowpipe を使ったデータロード
 1)全体構成
 2)前提条件
 3)作業手順

【1】Snowpipe

https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-intro

ステージ上でファイルが利用できるようになったことを検知し、
ファイルを継続的にロードする

1)公式ドキュメント

https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-intro
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-auto-s3
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-errors-sns

【2】SQL

1)CREATE PIPE

create pipe <Snowpipe名(必須)>
 auto_ingest=true
 integration='<>'
 comment='<コメント文>'
 as
   -- copy_statement は必須
   copy into <対象テーブル>
   from @<ステージ名>
;

https://docs.snowflake.com/ja/sql-reference/sql/create-pipe

AUTO_INGEST (TRUE | FALSE)

* イベント通知受信時、データファイルを自動的にロードするかどうか
 + TRUE : 外部ステージからのロード
 + FALSE : 自動データロードを無効
  => この場合、Snowpipe REST API エンドポイントを呼び出す必要がある)

2)SHOW PIPES

* アクセス権限があるパイプ一覧を表示

https://docs.snowflake.com/ja/sql-reference/sql/show-pipes

SHOW PIPES;

出力
https://docs.snowflake.com/ja/sql-reference/sql/show-pipes#output

出力項目 説明 メモ
created_on パイプが作成された日時
name パイプ名
database_name パイプが格納されているデータベース
schema_name パイプが格納されているスキーマ
definition COPY キューファイルからSnowflakeテーブルにデータをロードするために使用されるステートメント
owner パイプを所有するロール名
notification_channel DEFINITION 列で指定されたステージのAmazon SQS キューのAmazonリソース名 ★重要★
comment パイプのコメント文
integration パイプの通知統合名 Google Cloud StorageまたはMicrosoft Azureのみで、AWSは特に気にせんでいい?
pattern PATTERN コピーオプションが指定されている場合、パイプ定義の COPY INTO <テーブル> ステートメントのコピーオプション値
error_integration 通知をトリガーするためにAmazon S3クラウドストレージのエラーイベントに依存するパイプの通知統合名

【3】使用上の注意

1)推奨ロードファイルサイズ

以前の記事

Snowflakeのパフォーマンス改善 ~ データロードの改善 ~
https://dk521123.hatenablog.com/entry/2022/12/07/111847

で言っていた
「取り込むファイルサイズの統一(100~250 MBまたはそれ以上)」
と同じことを言っていおり

https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-intro#recommended-load-file-size

より抜粋
~~~~~~~~~~~~~~
Snowpipeで最も効率的で費用対効果の高いロードエクスペリエンスを得るには、 
ファイルサイズのベストプラクティスと制限事項 のファイルサイズに関する
推奨事項に従い、1分ごとにファイルをステージングすることをお勧めします。
~~~~~~~~~~~~~~

https://docs.snowflake.com/ja/user-guide/data-load-considerations-prepare#label-snowpipe-file-size

より抜粋
~~~~~~~~~~~~~~
Snowpipeで最も効率的で費用対効果の高いロードエクスペリエンスを得るために、
 ファイルサイズ設定のベストプラクティスと制限事項 (このトピック内)の
ファイルサイズ設定の推奨事項に従うことをお勧めします。
サイズがおよそ100から250 MB 以上のデータファイルをロードすると、
ロードされるデータの合計量に比べて、オーバーヘッドコストが重要ではなくなるまで
オーバーヘッド料金が削減されます。
~~~~~~~~~~~~~~

2)日時関数の使用

https://docs.snowflake.com/ja/sql-reference/sql/create-pipe#required-parameters

より抜粋
~~~~~~~~~~~~~~~~
現在、Snowpipeの copy_statement で
次の関数を使用することはお勧めしません。

CURRENT_DATE
CURRENT_TIME
CURRENT_TIMESTAMP
GETDATE
LOCALTIME
LOCALTIMESTAMP
SYSDATE
SYSTIMESTAMP

これらの関数を使用して挿入された時間値は、
 COPY_HISTORY 関数 または COPY_HISTORY ビュー によって
返される LOAD_TIME 値よりも数時間早くなる可能性があるという
既知の問題があります。

★解決案★
代わりに METADATA$START_SCAN_TIME をクエリすることをお勧めします。
これにより、記録のロードがより正確に表現されます。
~~~~~~~~~~~~~~~~

METADATA$START_SCAN_TIME
https://docs.snowflake.com/ja/user-guide/querying-metadata#metadata-columns

ステージングされたデータファイルにある
各記録の操作開始タイムスタンプ。
TIMESTAMP_LTZ として返されます。

TIMESTAMP_LTZ , TIMESTAMP_NTZ , TIMESTAMP_TZ
https://docs.snowflake.com/ja/sql-reference/data-types-datetime#timestamp-ltz-timestamp-ntz-timestamp-tz

* TIMESTAMP_LTZ
TIMESTAMP_LTZ は、 UTC 時間を指定された精度で内部に保存します。
ただし、すべての操作は、 TIMEZONE セッションパラメーターによって
制御される現在のセッションのタイムゾーンで実行されます。
 => よくわからん、、、

https://dev.classmethod.jp/articles/snowflake-three-types-of-timestamp/

timezoneセッションパラメーターの値に応じて値が動的に変化するのが、
TIMESTAMP_LTZ
 => ローカルタイムゾーンに依存する

自分用メモ

-- 明示的にUTCにしたい場合、↓でOK?
convert_timezone('UTC', METADATA$START_SCAN_TIME)

3)ファイルの削除

* 今のところ、独自でハンドリングしないとダメらしい、、、

https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-manage#deleting-staged-files-after-snowpipe-loads-the-data

より抜粋
~~~~~~~~~~~~~~~
パイプオブジェクトは、 PURGE コピーオプションをサポートしていません。
Snowpipeは、データがテーブルに正常にロードされたときに、
ステージングされたファイルを自動的に削除できません。

不要になったステージングされたファイルを削除するには、 
REMOVE コマンドを定期的に実行してファイルを削除することをお勧めします。
または、クラウドストレージサービスプロバイダーが
提供するライフサイクル管理機能を構成します。
~~~~~~~~~~~~~~~
 => COPY の後に、REMOVE実行させてくれればいいのでは、、、

https://docs.snowflake.com/ja/sql-reference/sql/create-pipe#usage-notes

より抜粋
~~~~~~~~~~~~~~~
次を【除く】、
すべての COPY INTO <テーブル> コピーオプションがサポートされています。

+ FILES = ( 'file_name1' [ , 'file_name2', ... ] )
+ ON_ERROR = ABORT_STATEMENT
+ SIZE_LIMIT = num
+ PURGE = TRUE | FALSE (つまり、ロード中の自動パージ)
+ FORCE = TRUE | FALSE
 REMOVE コマンドを使用して、内部(つまりSnowflake)ステージから
 ファイルを(ロード後に)手動で削除できます。
+ RETURN_FAILED_ONLY = TRUE | FALSE
+ VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS
~~~~~~~~~~~~~~~
 => 【除く】かよ、、、

【4】Snowpipe を使ったデータロード

* 以下の公式ドキュメントを一読しておくといいかも。

https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-auto-s3

1)全体構成

[1] S3
[2] SQS (Simple Queue Service) ... フルマネージドのキューイングサービス
[3] Snowflake (Snowpipe)

2)前提条件

[1] ストレージ統合および外部ステージが作成されていること
 => ストレージ統合/ステージについては、以下の関連記事を参照のこと

Snowflake ~ ストレージ統合 ~
https://dk521123.hatenablog.com/entry/2022/06/29/221037
Snowflake ~ 基本編 / ステージ ~
https://dk521123.hatenablog.com/entry/2022/09/01/220643

-- ストレージ統合
CREATE STORAGE INTEGRATION demo_storage_integration
    TYPE = EXTERNAL_STAGE
    STORAGE_PROVIDER = S3
    ENABLED = TRUE
    STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::001234567890:role/your-role'
    STORAGE_ALLOWED_LOCATIONS = ('s3://your-s3-bucket-name/')
;
-- ステージ
CREATE STAGE IF NOT EXISTS sample_db.public.sample_stage
  URL = 's3://your-s3-bucket-name/demo_warehouse/demo_database/demo_table/'
  STORAGE_INTEGRATION = demo_storage_integration
  FILE_FORMAT = (TYPE =PARQUET COMPRESSION = SNAPPY)
;

3)作業手順

[1] Snowpipe作成
[2] Snowpipeで使うAWS SQS情報を取得
[3] S3バケットに対し、SQSへの通知設定を有効化

[1] Snowpipe作成

create pipe demo_pipe_s3
  auto_ingest = true
  aws_sns_topic = 'arn:aws:sns:us-west-2:001234567890:s3_yourbucket'
  as
  copy into sample_db.public.sample_table
  from @sample_db.public.sample_stage
  file_format = (type = 'PARQUET' compression = 'SNAPPY');

[2] Snowpipeで使うAWS SQS情報を取得

-- 「show pipes;」で「notification_channel」を
-- テキストファイルなどに保存しておく
show pipes like '%demo_pipe_s3%';

[3] S3バケットに対し、SQSへの通知設定を有効化

1) S3コンソールの[プロパティ]を開き、[イベント通知]を選択
2) 設定値を入力
 => 今回、以下のS3にparquetファイル(拡張子 .parquet)が入っているとする
  s3://your-s3-bucket-name/demo_warehouse/demo_database/demo_table/

 + プレフィクスオプション: demo_warehouse/demo_database/demo_table/
 + サフィックスオプション:.parquet
 + イベントタイプ:すべてのオブジェクト作成イベントにチェック
 + SQSキュー:(「[2] Snowpipeで使うAWS SQS情報を取得」の値)

参考文献

https://qiita.com/KimiyukiMuramatsu/items/a88b0e527377ecd35626
https://dev.classmethod.jp/articles/try-continuous-data-loading-with-snowpipe/
https://dev.classmethod.jp/articles/googlesheets-to-snowflake/

関連記事

Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111
Snowflake ~ 入門編 / Hello world
https://dk521123.hatenablog.com/entry/2021/11/22/212520
Snowflake ~ 基本編 / ステージ ~
https://dk521123.hatenablog.com/entry/2022/09/01/220643
Snowflake ~ ストレージ統合 ~
https://dk521123.hatenablog.com/entry/2022/06/29/221037
Snowflake ~ Task ~
https://dk521123.hatenablog.com/entry/2023/04/17/174732
Snowflake ~ Snowpipe Streaming ~
https://dk521123.hatenablog.com/entry/2023/07/04/001637
Snowflakeのパフォーマンス改善 ~ データロードの改善 ~
https://dk521123.hatenablog.com/entry/2022/12/07/111847