■ はじめに
今日ってゆーかー昨日、 Kafka Connect の Connector 構成プロパティの指定がミスってて 2~3日ハマって迷惑かけたので、メモ。
目次
【1】接続 1)connection.url 2)connection.user 3)connection.password 4)connection.attempts 5)dialect.name 【2】書き込み 1)insert.mode 2)batch.size 3)delete.enabled 【3】データマッピング 1)table.name.format 2)pk.mode 3)pk.fields 4)fields.whitelist 5)db.timezone 【4】DDL サポート 1)auto.create 2)auto.evolve 3)quote.sql.identifiers 【5】再試行 1)max.retries 2)retry.backoff.ms 【6】トラブル 1)物凄く遅くDBに更新される現象が発生
【1】接続
1)connection.url
* JDBC URL
2)connection.user
* 接続先のユーザ名 => 上のサイトの説明間違ってる?
3)connection.password
* 接続先のパスワード
4)connection.attempts
* 有効な JDBC 接続の取得を再試行する最大回数
5)dialect.name
* コネクターで使用する必要があるデータベース言語の名前
【2】書き込み
1)insert.mode
* Insertモード * 指定可能な値: [insert、upsert、update]
[1] insert
* SQL の INSERT
[2] upsert
* * upsert モードを使用する場合は、コネクター構成に pk.mode プロパティと pk.fields プロパティを追加する必要がある => ★ここでハマった★
[3] update
* SQL の UPDATE
2)batch.size
* バッチ サイズ(メッセージグループサイズ)は、 そのグループを送信する前になければならないバイト数 * デフォルト: 3000
補足:バッチ(batch)とは?
* Producerが、単一パーティションで送信されるメッセージグループのこと
https://learn.microsoft.com/ja-jp/azure/hdinsight/kafka/apache-kafka-performance-tuning
得られる効果は?
* batch.size を増やすと、 ネットワークと IO 要求からのオーバーヘッドの処理が減るため、 スループットを向上 => 負荷が低く、バッチ サイズが大きくなると、 プロデューサーはバッチの準備が完了するのを待機するため、 Kafka の送信待ち時間が増える可能性がある => 負荷が高い場合は、スループットを向上させて待ち時間を減らすために、 バッチ サイズを増やすと、パフォーマンス向上を見込める
3)delete.enabled
* null レコード値を削除として扱うかどうか * pk.mode を record_key に設定する必要がある * デフォルト: false
【3】データマッピング
1)table.name.format
* マップ先テーブル名のフォーマット制御文字列 * マップ元のトピック名を表すプレースホルダーとして「${topic}」を含めることができる * 例:トピック「orders」の kafka_${topic} は、テーブル名「kafka_orders」にマップされる * デフォルト: ${topic}
2)pk.mode
* プライマリキーモード * 指定可能な値: [none、kafka、record_key、record_value]
[1] none
* キー指定しない
[2] kafka
* Apache Kafka® 座標???を PK として使用
[3] record_key
* レコードキーのフィールドを使用
[4] record_value
* レコード値のフィールドを使用
3)pk.fields
* プライマリキーのフィールド名のコンマ区切りのリスト * pk.mode に依存
[1] none
* 何もしない
[2] kafka
* Kafka 座標を表す 3 つの値である必要がある * 空の場合はデフォルトで __connect_topic,__connect_partition,__connect_offset になる
[3] record_key
* 指定されている場合は、目的のフィールドの抽出に使用 * 空の場合、キー構造体のすべてのフィールドが利用
[4] record_value
* 指定されている場合は、目的のフィールドの抽出に使用 * 空の場合は、値構造体からすべてのフィールドが利用
4)fields.whitelist
* レコード値フィールド名のコンマ区切りのリスト * リストを設定した場合は、目的のフィールドのフィルター処理に使用 * 空の場合は、レコード値からすべてのフィールドが利用
5)db.timezone
* コネクターで時間ベースの値を挿入する場合に使用 * デフォルト: "UTC"
【4】DDL サポート
1)auto.create
* 送信先テーブルが存在しない場合に、 CREATE を自動的に作成するかどうかを指定 * デフォルト: false
2)auto.evolve
* レコードスキーマに関連する列がテーブルスキーマに存在しない場合に、 ALTER を発行して自動的に追加するかどうかを指定 * デフォルト: false
3)quote.sql.identifiers
* SQL ステートメントで、テーブル名、列名、その他の識別子をいつクォートするかを指定
【5】再試行
1)max.retries
* エラー時に再試行する最大回数 * デフォルト: 10
2)retry.backoff.ms
* エラーの後、再試行するまでの待ち時間(ミリ秒) * デフォルト: 3000
関連記事
Kafka Connect ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Kafka Connect ~ 基本編 / PostgreSQL ~
https://dk521123.hatenablog.com/entry/2023/05/02/233806
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Kafka Connect ~ Rest API ~
https://dk521123.hatenablog.com/entry/2023/05/31/000000
Apache Kafka ~ Strimzi ~
https://dk521123.hatenablog.com/entry/2023/05/08/000133
Amazon MSK ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/04/21/000000
Amazon MSK ~ 基本編 / Connector ~
https://dk521123.hatenablog.com/entry/2023/05/20/003516
curl コマンド
https://dk521123.hatenablog.com/entry/2017/12/05/233100