■ はじめに
業務で、 Kafka用Snowflakeコネクタ を試すことになったのでメモ。 徐々に書き足していく、、、
目次
【1】Kafkaコネクタ 【2】仕様 【3】設定 1)Kafkaコネクタの種類 2)設定手順 【4】Kafka構成ファイル 1)構成ファイル例 2)Kafka構成プロパティ
【1】Kafkaコネクタ
https://docs.snowflake.com/ja/user-guide/kafka-connector
* Kafka Connectクラスターで実行され、Kafkaトピックからデータを読み取り、 Snowflakeテーブルにデータを書き込むように設計
【2】仕様
https://docs.snowflake.com/ja/user-guide/kafka-connector-overview#target-tables-for-kafka-topics
* 小文字のトピック名は、大文字のテーブル名に変換 * トピック名の最初の文字が文字(a-z または A-Z) またはアンダースコア文字(_)でない場合、 コネクタはテーブル名の先頭にアンダースコアを追加
【3】設定
https://docs.snowflake.com/ja/user-guide/kafka-connector-install
1)Kafkaコネクタの種類
* 以下の2種類のJARが用意されている [1] Kafkaの Confluent パッケージバージョン [2] オープンソースソフトウェア(OSS)用のApache Kafkaパッケージバージョン
https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector
2)設定手順
https://docs.snowflake.com/ja/user-guide/kafka-connector-install#install-the-kafka-connector
[1] JDK / Apache Kafkaのインストール
https://docs.snowflake.com/ja/user-guide/kafka-connector-install#install-apache-kafka
https://docs.snowflake.com/ja/user-guide/kafka-connector-install#install-the-jdk
* 詳細は、以下の関連記事を参照のこと
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
[2] Kafkaコネクタに関わるJAR ファイルをダウンロード
https://docs.snowflake.com/ja/user-guide/kafka-connector-install#download-the-kafka-connector-jar-files
* 以下が必要。 [a] snowflake-kafka-connector-1.9.3.jar (★必須) => Kafkaコネクタ JAR ファイル
[b] bc-fips-1.0.1.jar / bcpkix-fips-1.0.3.jar (☆ほぼ必須。暗号化された秘密キーを使用する) => Bouncy Castle 暗号化ライブラリ / Bouncy Castle Provider (FIPS Distribution) =>Snowflakeは、Bouncy Castleでログインに使用される 暗号化された RSA 秘密キーを復号化する [c] avro-1.11.1.jar (☆Avro形式使用時のみ) => Kafkaデータが Apache Avro 形式でストリーミングされている場合で使用
[3] Kafkaコネクタのインストール
https://docs.snowflake.com/ja/user-guide/kafka-connector-install#id3
* [3]でダウンロードしたJARファイルを <kafkaディレクトリ>/libs フォルダーに置く
【4】Kafka構成ファイル
https://docs.snowflake.com/ja/user-guide/kafka-connector-install#configuring-the-kafka-connector
* 各構成ファイルは、1つのデータベースと そのデータベース内の1つのスキーマのトピックと対応するテーブルを指定 => <構成ファイル 1> : <DB 1> の関係 * 1つのコネクタは、任意の数のトピックからメッセージを取り込む => <コネクタ 1> : <Topic 多> の関係
1)構成ファイル例
* 公式ドキュメントには「分散モード」と「スタンドアロンモード」があるが、 分散モードの「config」部分の内容は、「スタンドアロンモード」と同じ (ファイルフォーマットは異なる)
https://docs.snowflake.com/ja/user-guide/kafka-connector-install#distributed-mode
ファイル例
{ "name":"XYZCompanySensorData", "config":{ "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector", "tasks.max":"8", "topics":"topic1,topic2", "snowflake.topic2table.map": "topic1:table1,topic2:table2", "buffer.count.records":"10000", "buffer.flush.time":"60", "buffer.size.bytes":"5000000", "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443", "snowflake.user.name":"jane.smith", "snowflake.private.key":"xyz123", "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r", "snowflake.database.name":"mydb", "snowflake.schema.name":"myschema", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter", "value.converter.schema.registry.url":"http://localhost:8081", "value.converter.basic.auth.credentials.source":"USER_INFO", "value.converter.basic.auth.user.info":"jane.smith:MyStrongPassword" } }
2)Kafka構成プロパティ
* かなりあるので、詳細は、公式ドキュメント参照。
name (必須)
* アプリケーション名 (Kafkaコネクタで一意)
connector.class (必須)
* com.snowflake.kafka.connector.SnowflakeSinkConnector の一択
topics (必須)
* Topicを指定(Kafkaの仕様と同じで、カンマ区切り)
snowflake.url.name (必須)
* Snowflakeアカウントにアクセスするための ホスト名(http/ポートはオプション) e.g. <account_identifier>.snowflakecomputing.com
snowflake.user.name (必須)
* Snowflakeアカウントのユーザーログイン名
snowflake.private.key (必須)
* ユーザーを認証するための秘密キー => Snowflakeのキーペア認証については、以下の関連記事を参照のこと
https://dk521123.hatenablog.com/entry/2023/06/08/004532
snowflake.database.name (必須)
* 行を挿入するテーブルを含むデータベース名
snowflake.schema.name (必須)
* 行を挿入するテーブルを含むスキーマ名
key.converter (必須)
* Kafkaのメッセージのキーコンバーター * 例: "org.apache.kafka.connect.storage.StringConverter"
value.converter (必須)
* Kafkaのメッセージの値コンバーター * 文字列の場合、"org.apache.kafka.connect.storage.StringConverter" * JSONの場合、"com.snowflake.kafka.connector.records.SnowflakeJsonConverter" * Avroの場合、"com.snowflake.kafka.connector.records.SnowflakeAvroConverter" * Avro/Kafkaのスキーマレジストリサービスを使用する場合、 "com.snowflake.kafka.connector.records.SnowflakeAvroConverter" * Avro/スキーマを含む(Kafkaのスキーマレジストリサービスを必要としない)場合、 "com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry"
snowflake.schema.name (必須)
* 行を挿入するテーブルを含むスキーマ名
header.converter (条件必須)
* レコードがAvroでフォーマットされ、ヘッダーが含まれている場合にのみ必要 * 値は "org.apache.kafka.connect.storage.StringConverter"
snowflake.private.key.passphrase (オプション)
* パラメーターの値が空でない場合、 Kafkaはこのフレーズを使用して秘密キーの復号化を試みる
tasks.max (オプション)
* タスクの数 * 通常、Kafka Connectクラスタのワーカーノード全体の CPU コアの数と同じ * ただし、Snowflakeは高く設定することを非推奨
snowflake.topic2table.map (オプション)
* どのトピックが、どのテーブルにマッピングするかを指定 => ★これは、オプションだが使えそう
buffer.count.records (オプション)
* Snowflakeにインジェストされる前に、 Kafkaパーティションごとにメモリにバッファされるメッセージ数 * デフォルト値: 10000
buffer.flush.time (オプション)
* バッファーフラッシュ間の秒数 * フラッシュはKafkaのメモリキャッシュから内部ステージまで * デフォルト値: 120 秒
buffer.size.bytes (オプション)
* データファイルとしてSnowflakeに取り込まれる前に、 Kafkaパーティションごとにメモリーにバッファーされたレコードの累積サイズ(バイト単位) * デフォルト値: 5000000 (5 MB) => バッファー内のレコードのサイズは、レコードから作成されたデータファイルのサイズよりも 大きくなる可能性がある
関連記事
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Apache Kafka ~ 設定値 ~
https://dk521123.hatenablog.com/entry/2023/05/28/151212
Kafka Connect ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Kafka Connect ~ Connector 構成プロパティ ~
https://dk521123.hatenablog.com/entry/2023/06/02/011131
Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111
Snowflake ~ 入門編 / Hello world ~
https://dk521123.hatenablog.com/entry/2021/11/22/212520
Snowflake ~ キーペア認証 ~
https://dk521123.hatenablog.com/entry/2023/06/08/004532
Snowflake ~ Snowpipe Streaming ~
https://dk521123.hatenablog.com/entry/2023/07/04/001637