【Kafka】Kafkaコネクタ ~ Kafka用Snowflakeコネクタ ~

■ はじめに

業務で、 Kafka用Snowflakeコネクタ を試すことになったのでメモ。
徐々に書き足していく、、、

目次

【1】Kafkaコネクタ
【2】仕様
【3】設定
 1)Kafkaコネクタの種類
 2)設定手順
【4】Kafka構成ファイル
 1)構成ファイル例
 2)Kafka構成プロパティ

【1】Kafkaコネクタ

https://docs.snowflake.com/ja/user-guide/kafka-connector

* Kafka Connectクラスターで実行され、Kafkaトピックからデータを読み取り、
 Snowflakeテーブルにデータを書き込むように設計

【2】仕様

https://docs.snowflake.com/ja/user-guide/kafka-connector-overview#target-tables-for-kafka-topics

* 小文字のトピック名は、大文字のテーブル名に変換
* トピック名の最初の文字が文字(a-z または A-Z)
 またはアンダースコア文字(_)でない場合、
 コネクタはテーブル名の先頭にアンダースコアを追加

【3】設定

https://docs.snowflake.com/ja/user-guide/kafka-connector-install

1)Kafkaコネクタの種類

* 以下の2種類のJARが用意されている
[1] Kafkaの Confluent パッケージバージョン
[2] オープンソースソフトウェア(OSS)用のApache Kafkaパッケージバージョン

https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector

2)設定手順

https://docs.snowflake.com/ja/user-guide/kafka-connector-install#install-the-kafka-connector
[1] JDK / Apache Kafkaのインストール
https://docs.snowflake.com/ja/user-guide/kafka-connector-install#install-apache-kafka
https://docs.snowflake.com/ja/user-guide/kafka-connector-install#install-the-jdk

* 詳細は、以下の関連記事を参照のこと

Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534

[2] Kafkaコネクタに関わるJAR ファイルをダウンロード
https://docs.snowflake.com/ja/user-guide/kafka-connector-install#download-the-kafka-connector-jar-files

* 以下が必要。

[a] snowflake-kafka-connector-1.9.3.jar (★必須)
 => Kafkaコネクタ JAR ファイル

https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.9.3/snowflake-kafka-connector-1.9.3.jar

[b] bc-fips-1.0.1.jar / bcpkix-fips-1.0.3.jar (☆ほぼ必須。暗号化された秘密キーを使用する)
 => Bouncy Castle 暗号化ライブラリ / Bouncy Castle Provider (FIPS Distribution)
 =>Snowflakeは、Bouncy Castleでログインに使用される
  暗号化された RSA 秘密キーを復号化する

[c] avro-1.11.1.jar (☆Avro形式使用時のみ)
 => Kafkaデータが Apache Avro 形式でストリーミングされている場合で使用

[3] Kafkaコネクタのインストール
https://docs.snowflake.com/ja/user-guide/kafka-connector-install#id3

* [3]でダウンロードしたJARファイルを <kafkaディレクトリ>/libs フォルダーに置く

【4】Kafka構成ファイル

https://docs.snowflake.com/ja/user-guide/kafka-connector-install#configuring-the-kafka-connector

* 各構成ファイルは、1つのデータベースと
 そのデータベース内の1つのスキーマのトピックと対応するテーブルを指定
 => <構成ファイル 1> : <DB 1> の関係

* 1つのコネクタは、任意の数のトピックからメッセージを取り込む
 => <コネクタ 1> : <Topic 多> の関係

1)構成ファイル例

* 公式ドキュメントには「分散モード」と「スタンドアロンモード」があるが、
 分散モードの「config」部分の内容は、「スタンドアロンモード」と同じ
 (ファイルフォーマットは異なる)

https://docs.snowflake.com/ja/user-guide/kafka-connector-install#distributed-mode
ファイル例

{
  "name":"XYZCompanySensorData",
  "config":{
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"topic1,topic2",
    "snowflake.topic2table.map": "topic1:table1,topic2:table2",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
    "snowflake.user.name":"jane.smith",
    "snowflake.private.key":"xyz123",
    "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
    "snowflake.database.name":"mydb",
    "snowflake.schema.name":"myschema",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081",
    "value.converter.basic.auth.credentials.source":"USER_INFO",
    "value.converter.basic.auth.user.info":"jane.smith:MyStrongPassword"
  }
}

2)Kafka構成プロパティ

* かなりあるので、詳細は、公式ドキュメント参照。

name (必須)

* アプリケーション名 (Kafkaコネクタで一意)

connector.class (必須)

* com.snowflake.kafka.connector.SnowflakeSinkConnector の一択

topics (必須)

* Topicを指定(Kafkaの仕様と同じで、カンマ区切り)

snowflake.url.name (必須)

* Snowflakeアカウントにアクセスするための ホスト名(http/ポートはオプション)
 e.g. <account_identifier>.snowflakecomputing.com

snowflake.user.name (必須)

* Snowflakeアカウントのユーザーログイン名

snowflake.private.key (必須)

* ユーザーを認証するための秘密キー
 => Snowflakeのキーペア認証については、以下の関連記事を参照のこと

https://dk521123.hatenablog.com/entry/2023/06/08/004532

snowflake.database.name (必須)

* 行を挿入するテーブルを含むデータベース名

snowflake.schema.name (必須)

* 行を挿入するテーブルを含むスキーマ名

key.converter (必須)

* Kafkaのメッセージのキーコンバーター
* 例: "org.apache.kafka.connect.storage.StringConverter"

value.converter (必須)

* Kafkaのメッセージの値コンバーター
* 文字列の場合、"org.apache.kafka.connect.storage.StringConverter"
* JSONの場合、"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
* Avroの場合、"com.snowflake.kafka.connector.records.SnowflakeAvroConverter"
* Avro/Kafkaのスキーマレジストリサービスを使用する場合、
 "com.snowflake.kafka.connector.records.SnowflakeAvroConverter"
* Avro/スキーマを含む(Kafkaのスキーマレジストリサービスを必要としない)場合、
 "com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry"

snowflake.schema.name (必須)

* 行を挿入するテーブルを含むスキーマ名

header.converter (条件必須)

* レコードがAvroでフォーマットされ、ヘッダーが含まれている場合にのみ必要
* 値は "org.apache.kafka.connect.storage.StringConverter"

snowflake.private.key.passphrase (オプション)

* パラメーターの値が空でない場合、
 Kafkaはこのフレーズを使用して秘密キーの復号化を試みる

tasks.max (オプション)

* タスクの数
* 通常、Kafka Connectクラスタのワーカーノード全体の CPU コアの数と同じ
* ただし、Snowflakeは高く設定することを非推奨

snowflake.topic2table.map (オプション)

* どのトピックが、どのテーブルにマッピングするかを指定
 => ★これは、オプションだが使えそう

buffer.count.records (オプション)

* Snowflakeにインジェストされる前に、
 Kafkaパーティションごとにメモリにバッファされるメッセージ数
* デフォルト値: 10000

buffer.flush.time (オプション)

* バッファーフラッシュ間の秒数
* フラッシュはKafkaのメモリキャッシュから内部ステージまで
* デフォルト値: 120 秒

buffer.size.bytes (オプション)

* データファイルとしてSnowflakeに取り込まれる前に、
 Kafkaパーティションごとにメモリーにバッファーされたレコードの累積サイズ(バイト単位)
* デフォルト値: 5000000 (5 MB)
 => バッファー内のレコードのサイズは、レコードから作成されたデータファイルのサイズよりも
  大きくなる可能性がある

関連記事

Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Apache Kafka ~ 設定値 ~
https://dk521123.hatenablog.com/entry/2023/05/28/151212
Kafka Connect ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Kafka Connect ~ Connector 構成プロパティ ~
https://dk521123.hatenablog.com/entry/2023/06/02/011131
Snowflake ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2021/11/02/130111
Snowflake ~ 入門編 / Hello world
https://dk521123.hatenablog.com/entry/2021/11/22/212520
Snowflake ~ キーペア認証 ~
https://dk521123.hatenablog.com/entry/2023/06/08/004532
Snowflake ~ Snowpipe Streaming ~
https://dk521123.hatenablog.com/entry/2023/07/04/001637