■ はじめに
https://dk521123.hatenablog.com/entry/2023/04/15/225515
で、Snowpipe を扱ったが、 今回は、Snowpipe Streaming について、徐々にメモっていく
目次
【1】Snowpipe Streaming 1)Snowpipe との比較表 【2】必要最小バージョン 【3】構成プロパティ メモ:snowflake.enable.schematization
【1】Snowpipe Streaming
色々読んでみたが、結局、以下の公式ドキュメントの図において Snowpipe と Snowpipe Streaming の比較図を見た方が理解しやすいかも。
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-streaming-kafka
* データを行単位でSnowflakeテーブルに書き込むことができる
1)Snowpipe との比較表
Items | Snowpipe Streaming | Snowpipe | Memo |
---|---|---|---|
ロードするデータ形式 | 行単位 | ファイル単位 | Snowpipeの場合、指定されたフラッシュバッファーのしきい値に達すると、仮のステージングされたファイルにデータを書き込む |
データの順序 | 各チャネル内の順序付き挿入 | 順序はコントロールできない | |
パイプオブジェクト | 不要 | 必要(ステージングされたファイルデータをキューに入れ、ターゲットテーブルにロードするために必要) | 公式ドキュメントの図を見るといいかも |
【2】必要最小バージョン
https://docs.snowflake.com/ja/user-guide/data-load-snowpipe-streaming-kafka#minimum-required-version
* Kafkaコネクタバージョン1.9.1(またはそれ以上) => Kafkaコネクタに関する詳細は、以下の関連記事を参照のこと
Kafkaコネクタ ~ Kafka用Snowflakeコネクタ ~
https://dk521123.hatenablog.com/entry/2023/06/07/144114
【3】構成プロパティ
{ "name":"XYZCompanySensorData", "config":{ // ストリーミングインジェストクライアントとしてKafkaコネクタを使用する場合にのみ必要 "snowflake.ingestion.method":"SNOWPIPE_STREAMING", // テーブルに行を挿入するときに使用するアクセス制御ロール "snowflake.role.name":"", // Kafkaコネクタで発生したエラーの処理方法を指定(ALL: エラー無視) "errors.tolerance":"NONE", // Kafka Connectログファイルにエラーメッセージを書き込むかどうかを指定 "errors.log.enable":"TRUE", ... } }
メモ:snowflake.enable.schematization
* Schematization を有効にするかどうか(true/false)
falseの場合
* VARIANT型のCONTENT_METADATAに格納される * スキーマレスでスキーマに変更があっても影響しない
trueの場合
* 初期時は、テーブルを作成 * 以降は、データが追加/更新/削除
https://medium.com/snowflake/data-loading-schematization-with-snowflake-d75d9bbd3bee
コードから追う
* 以下で使われている
関連記事
Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111
Snowflake ~ 入門編 / Hello world ~
https://dk521123.hatenablog.com/entry/2021/11/22/212520
Snowflake ~ Snowpipe ~
https://dk521123.hatenablog.com/entry/2023/04/15/225515
Kafkaコネクタ ~ Kafka用Snowflakeコネクタ ~
https://dk521123.hatenablog.com/entry/2023/06/07/144114