■ はじめに
仕事で、Snowflake の Snowpipe を試しそうなので 予習しておく
目次
【1】Snowpipe 1)公式ドキュメント 【2】SQL文 1)CREATE PIPE 2)SHOW PIPES 【3】使用上の注意 1)推奨ロードファイルサイズ 2)日時関数の使用 3)ファイルの削除 【4】Snowpipe を使ったデータロード 1)全体構成 2)前提条件 3)作業手順
【1】Snowpipe
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-intro
ステージ上でファイルが利用できるようになったことを検知し、 ファイルを継続的にロードする
1)公式ドキュメント
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-intro
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-auto-s3
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-errors-sns
【2】SQL文
1)CREATE PIPE
create pipe <Snowpipe名(必須)> auto_ingest=true integration='<>' comment='<コメント文>' as -- copy_statement は必須 copy into <対象テーブル> from @<ステージ名> ;
https://docs.snowflake.com/ja/sql-reference/sql/create-pipe
AUTO_INGEST (TRUE | FALSE)
* イベント通知受信時、データファイルを自動的にロードするかどうか + TRUE : 外部ステージからのロード + FALSE : 自動データロードを無効 => この場合、Snowpipe REST API エンドポイントを呼び出す必要がある)
2)SHOW PIPES
* アクセス権限があるパイプ一覧を表示
https://docs.snowflake.com/ja/sql-reference/sql/show-pipes
SHOW PIPES;
出力
https://docs.snowflake.com/ja/sql-reference/sql/show-pipes#output
出力項目 | 説明 | メモ |
---|---|---|
created_on | パイプが作成された日時 | |
name | パイプ名 | |
database_name | パイプが格納されているデータベース | |
schema_name | パイプが格納されているスキーマ | |
definition | COPY キューファイルからSnowflakeテーブルにデータをロードするために使用されるステートメント | |
owner | パイプを所有するロール名 | |
notification_channel | DEFINITION 列で指定されたステージのAmazon SQS キューのAmazonリソース名 | ★重要★ |
comment | パイプのコメント文 | |
integration | パイプの通知統合名 | Google Cloud StorageまたはMicrosoft Azureのみで、AWSは特に気にせんでいい? |
pattern | PATTERN コピーオプションが指定されている場合、パイプ定義の COPY INTO <テーブル> ステートメントのコピーオプション値 | |
error_integration | 通知をトリガーするためにAmazon S3クラウドストレージのエラーイベントに依存するパイプの通知統合名 |
【3】使用上の注意
1)推奨ロードファイルサイズ
以前の記事
Snowflakeのパフォーマンス改善 ~ データロードの改善 ~
https://dk521123.hatenablog.com/entry/2022/12/07/111847
で言っていた 「取り込むファイルサイズの統一(100~250 MBまたはそれ以上)」 と同じことを言っていおり
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-intro#recommended-load-file-size
より抜粋 ~~~~~~~~~~~~~~ Snowpipeで最も効率的で費用対効果の高いロードエクスペリエンスを得るには、 ファイルサイズのベストプラクティスと制限事項 のファイルサイズに関する 推奨事項に従い、1分ごとにファイルをステージングすることをお勧めします。 ~~~~~~~~~~~~~~
https://docs.snowflake.com/ja/user-guide/data-load-considerations-prepare#label-snowpipe-file-size
より抜粋 ~~~~~~~~~~~~~~ Snowpipeで最も効率的で費用対効果の高いロードエクスペリエンスを得るために、 ファイルサイズ設定のベストプラクティスと制限事項 (このトピック内)の ファイルサイズ設定の推奨事項に従うことをお勧めします。 サイズがおよそ100から250 MB 以上のデータファイルをロードすると、 ロードされるデータの合計量に比べて、オーバーヘッドコストが重要ではなくなるまで オーバーヘッド料金が削減されます。 ~~~~~~~~~~~~~~
2)日時関数の使用
https://docs.snowflake.com/ja/sql-reference/sql/create-pipe#required-parameters
より抜粋 ~~~~~~~~~~~~~~~~ 現在、Snowpipeの copy_statement で 次の関数を使用することはお勧めしません。 CURRENT_DATE CURRENT_TIME CURRENT_TIMESTAMP GETDATE LOCALTIME LOCALTIMESTAMP SYSDATE SYSTIMESTAMP これらの関数を使用して挿入された時間値は、 COPY_HISTORY 関数 または COPY_HISTORY ビュー によって 返される LOAD_TIME 値よりも数時間早くなる可能性があるという 既知の問題があります。 ★解決案★ 代わりに METADATA$START_SCAN_TIME をクエリすることをお勧めします。 これにより、記録のロードがより正確に表現されます。 ~~~~~~~~~~~~~~~~
METADATA$START_SCAN_TIME
https://docs.snowflake.com/ja/user-guide/querying-metadata#metadata-columns
ステージングされたデータファイルにある 各記録の操作開始タイムスタンプ。 TIMESTAMP_LTZ として返されます。
TIMESTAMP_LTZ , TIMESTAMP_NTZ , TIMESTAMP_TZ
https://docs.snowflake.com/ja/sql-reference/data-types-datetime#timestamp-ltz-timestamp-ntz-timestamp-tz
* TIMESTAMP_LTZ TIMESTAMP_LTZ は、 UTC 時間を指定された精度で内部に保存します。 ただし、すべての操作は、 TIMEZONE セッションパラメーターによって 制御される現在のセッションのタイムゾーンで実行されます。 => よくわからん、、、
https://dev.classmethod.jp/articles/snowflake-three-types-of-timestamp/
timezoneセッションパラメーターの値に応じて値が動的に変化するのが、 TIMESTAMP_LTZ => ローカルタイムゾーンに依存する
自分用メモ
-- 明示的にUTCにしたい場合、↓でOK? convert_timezone('UTC', METADATA$START_SCAN_TIME)
3)ファイルの削除
* 今のところ、独自でハンドリングしないとダメらしい、、、
より抜粋 ~~~~~~~~~~~~~~~ パイプオブジェクトは、 PURGE コピーオプションをサポートしていません。 Snowpipeは、データがテーブルに正常にロードされたときに、 ステージングされたファイルを自動的に削除できません。 不要になったステージングされたファイルを削除するには、 REMOVE コマンドを定期的に実行してファイルを削除することをお勧めします。 または、クラウドストレージサービスプロバイダーが 提供するライフサイクル管理機能を構成します。 ~~~~~~~~~~~~~~~ => COPY の後に、REMOVE実行させてくれればいいのでは、、、
https://docs.snowflake.com/ja/sql-reference/sql/create-pipe#usage-notes
より抜粋 ~~~~~~~~~~~~~~~ 次を【除く】、 すべての COPY INTO <テーブル> コピーオプションがサポートされています。 + FILES = ( 'file_name1' [ , 'file_name2', ... ] ) + ON_ERROR = ABORT_STATEMENT + SIZE_LIMIT = num + PURGE = TRUE | FALSE (つまり、ロード中の自動パージ) + FORCE = TRUE | FALSE REMOVE コマンドを使用して、内部(つまりSnowflake)ステージから ファイルを(ロード後に)手動で削除できます。 + RETURN_FAILED_ONLY = TRUE | FALSE + VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS ~~~~~~~~~~~~~~~ => 【除く】かよ、、、
【4】Snowpipe を使ったデータロード
* 以下の公式ドキュメントを一読しておくといいかも。
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-auto-s3
1)全体構成
[1] S3 [2] SQS (Simple Queue Service) ... フルマネージドのキューイングサービス [3] Snowflake (Snowpipe)
2)前提条件
[1] ストレージ統合および外部ステージが作成されていること => ストレージ統合/ステージについては、以下の関連記事を参照のこと
Snowflake ~ ストレージ統合 ~
https://dk521123.hatenablog.com/entry/2022/06/29/221037
Snowflake ~ 基本編 / ステージ ~
https://dk521123.hatenablog.com/entry/2022/09/01/220643
-- ストレージ統合 CREATE STORAGE INTEGRATION demo_storage_integration TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::001234567890:role/your-role' STORAGE_ALLOWED_LOCATIONS = ('s3://your-s3-bucket-name/') ; -- ステージ CREATE STAGE IF NOT EXISTS sample_db.public.sample_stage URL = 's3://your-s3-bucket-name/demo_warehouse/demo_database/demo_table/' STORAGE_INTEGRATION = demo_storage_integration FILE_FORMAT = (TYPE =PARQUET COMPRESSION = SNAPPY) ;
3)作業手順
[1] Snowpipe作成 [2] Snowpipeで使うAWS SQS情報を取得 [3] S3バケットに対し、SQSへの通知設定を有効化
[1] Snowpipe作成
create pipe demo_pipe_s3 auto_ingest = true aws_sns_topic = 'arn:aws:sns:us-west-2:001234567890:s3_yourbucket' as copy into sample_db.public.sample_table from @sample_db.public.sample_stage file_format = (type = 'PARQUET' compression = 'SNAPPY');
[2] Snowpipeで使うAWS SQS情報を取得
-- 「show pipes;」で「notification_channel」を -- テキストファイルなどに保存しておく show pipes like '%demo_pipe_s3%';
[3] S3バケットに対し、SQSへの通知設定を有効化
1) S3コンソールの[プロパティ]を開き、[イベント通知]を選択 2) 設定値を入力 => 今回、以下のS3にparquetファイル(拡張子 .parquet)が入っているとする s3://your-s3-bucket-name/demo_warehouse/demo_database/demo_table/ + プレフィクスオプション: demo_warehouse/demo_database/demo_table/ + サフィックスオプション:.parquet + イベントタイプ:すべてのオブジェクト作成イベントにチェック + SQSキュー:(「[2] Snowpipeで使うAWS SQS情報を取得」の値)
参考文献
https://qiita.com/KimiyukiMuramatsu/items/a88b0e527377ecd35626
https://dev.classmethod.jp/articles/try-continuous-data-loading-with-snowpipe/
https://dev.classmethod.jp/articles/googlesheets-to-snowflake/
関連記事
Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111
Snowflake ~ 入門編 / Hello world ~
https://dk521123.hatenablog.com/entry/2021/11/22/212520
Snowflake ~ 基本編 / ステージ ~
https://dk521123.hatenablog.com/entry/2022/09/01/220643
Snowflake ~ ストレージ統合 ~
https://dk521123.hatenablog.com/entry/2022/06/29/221037
Snowflake ~ Task ~
https://dk521123.hatenablog.com/entry/2023/04/17/174732
Snowflake ~ Snowpipe Streaming ~
https://dk521123.hatenablog.com/entry/2023/07/04/001637
Snowflakeのパフォーマンス改善 ~ データロードの改善 ~
https://dk521123.hatenablog.com/entry/2022/12/07/111847