【Kafka】Kafka Connect ~ Connector 構成プロパティ ~

■ はじめに

 今日ってゆーかー昨日、
Kafka Connect の Connector 構成プロパティの指定がミスってて
2~3日ハマって迷惑かけたので、メモ。

目次

【1】接続
 1)connection.url
 2)connection.user
 3)connection.password
 4)connection.attempts
 5)dialect.name
【2】書き込み
 1)insert.mode
 2)batch.size
 3)delete.enabled
【3】データマッピング
 1)table.name.format
 2)pk.mode
 3)pk.fields
 4)fields.whitelist
 5)db.timezone
【4】DDL サポート
 1)auto.create
 2)auto.evolve
 3)quote.sql.identifiers
【5】再試行
 1)max.retries
 2)retry.backoff.ms
【6】トラブル
 1)物凄く遅くDBに更新される現象が発生

【1】接続

https://docs.confluent.io/ja-jp/kafka-connectors/jdbc/10.0/sink-connector/sink_config_options.html#connection

1)connection.url

* JDBC URL

2)connection.user

* 接続先のユーザ名
 => 上のサイトの説明間違ってる?

3)connection.password

* 接続先のパスワード

4)connection.attempts

* 有効な JDBC 接続の取得を再試行する最大回数

5)dialect.name

* コネクターで使用する必要があるデータベース言語の名前

【2】書き込み

https://docs.confluent.io/ja-jp/kafka-connectors/jdbc/10.0/sink-connector/sink_config_options.html#writes

1)insert.mode

* Insertモード
* 指定可能な値: [insert、upsert、update]

[1] insert

* SQL の INSERT

[2] upsert

* 
* upsert モードを使用する場合は、コネクター構成に
 pk.mode プロパティと pk.fields プロパティを追加する必要がある
 => ★ここでハマった★

[3] update

* SQL の UPDATE 

2)batch.size

* バッチ サイズ(メッセージグループサイズ)は、
 そのグループを送信する前になければならないバイト数
* デフォルト: 3000

補足:バッチ(batch)とは?

* Producerが、単一パーティションで送信されるメッセージグループのこと

https://learn.microsoft.com/ja-jp/azure/hdinsight/kafka/apache-kafka-performance-tuning
得られる効果は?

* batch.size を増やすと、
 ネットワークと IO 要求からのオーバーヘッドの処理が減るため、
 スループットを向上
 => 負荷が低く、バッチ サイズが大きくなると、
  プロデューサーはバッチの準備が完了するのを待機するため、
  Kafka の送信待ち時間が増える可能性がある
 => 負荷が高い場合は、スループットを向上させて待ち時間を減らすために、
  バッチ サイズを増やすと、パフォーマンス向上を見込める

3)delete.enabled

* null レコード値を削除として扱うかどうか
* pk.mode を record_key に設定する必要がある
* デフォルト: false

【3】データマッピング

https://docs.confluent.io/ja-jp/kafka-connectors/jdbc/10.0/sink-connector/sink_config_options.html#data-mapping

1)table.name.format

* マップ先テーブル名のフォーマット制御文字列
* マップ元のトピック名を表すプレースホルダーとして「${topic}」を含めることができる
* 例:トピック「orders」の kafka_${topic} は、テーブル名「kafka_orders」にマップされる
* デフォルト: ${topic}

2)pk.mode

* プライマリキーモード
* 指定可能な値: [none、kafka、record_key、record_value]

[1] none

* キー指定しない

[2] kafka

* Apache Kafka® 座標???を PK として使用

[3] record_key

* レコードキーのフィールドを使用

[4] record_value

* レコード値のフィールドを使用

3)pk.fields

* プライマリキーのフィールド名のコンマ区切りのリスト
* pk.mode に依存

[1] none

* 何もしない

[2] kafka

* Kafka 座標を表す 3 つの値である必要がある
* 空の場合はデフォルトで __connect_topic,__connect_partition,__connect_offset になる

[3] record_key

* 指定されている場合は、目的のフィールドの抽出に使用
* 空の場合、キー構造体のすべてのフィールドが利用

[4] record_value

* 指定されている場合は、目的のフィールドの抽出に使用
* 空の場合は、値構造体からすべてのフィールドが利用

4)fields.whitelist

* レコード値フィールド名のコンマ区切りのリスト
* リストを設定した場合は、目的のフィールドのフィルター処理に使用
* 空の場合は、レコード値からすべてのフィールドが利用

5)db.timezone

* コネクターで時間ベースの値を挿入する場合に使用
* デフォルト: "UTC"

【4】DDL サポート

https://docs.confluent.io/ja-jp/kafka-connectors/jdbc/10.0/sink-connector/sink_config_options.html#ddl-support

1)auto.create

* 送信先テーブルが存在しない場合に、
 CREATE を自動的に作成するかどうかを指定
* デフォルト: false

2)auto.evolve

* レコードスキーマに関連する列がテーブルスキーマに存在しない場合に、
 ALTER を発行して自動的に追加するかどうかを指定
* デフォルト: false

3)quote.sql.identifiers

* SQL ステートメントで、テーブル名、列名、その他の識別子をいつクォートするかを指定

【5】再試行

https://docs.confluent.io/ja-jp/kafka-connectors/jdbc/10.0/sink-connector/sink_config_options.html#retries

1)max.retries

* エラー時に再試行する最大回数
* デフォルト: 10

2)retry.backoff.ms

* エラーの後、再試行するまでの待ち時間(ミリ秒)
* デフォルト: 3000

関連記事

Kafka Connect ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Kafka Connect ~ 基本編 / PostgreSQL
https://dk521123.hatenablog.com/entry/2023/05/02/233806
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Kafka Connect ~ Rest API
https://dk521123.hatenablog.com/entry/2023/05/31/000000
Apache Kafka ~ Strimzi ~
https://dk521123.hatenablog.com/entry/2023/05/08/000133
Amazon MSK ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/04/21/000000
Amazon MSK ~ 基本編 / Connector ~
https://dk521123.hatenablog.com/entry/2023/05/20/003516
curl コマンド
https://dk521123.hatenablog.com/entry/2017/12/05/233100