【Kafka】Apache Kafka ~ 設定値 ~

■ はじめに

AWS MSK(Amazon Managed Streaming for Apache Kafka)
を使ったシステムのパフォーマンステストを行っているが
てんでダメダメだった。

そこで、Kafkaに関するパフォーマンスに関わるプロパティなどを
調べてみたので、徐々にメモ。
(物凄い種類の設定内容でとても一日じゃ調べきれない、、、)

目次

【1】Producer
 1)batch.size
 2)linger.ms
 3)acks
【2】Consumer
 1)fetch.min.bytes / fetch.max.bytes
 2)fetch.max.wait.ms
 3)heartbeat.interval.ms
 4)max.poll.interval.ms
 5)max.poll.records
 6)heartbeat.interval.ms
 7)session.timeout.ms
【3】Broker
 1)background.threads
 2)queued.max.requests
 3)message.max.bytes
 4)compression.type
【4】Topic
 1)Partition
 2)max.message.bytes
【5】Connector
 1)tasks.max
【6】AWS MSK
 1)ブローカータイプ及びパーティション数
 2)Connector capacity を調整する

【1】Producer

* データをBrokerに対して、送信する側のアプリケーション

設定値
https://kafka.apache.org/documentation.html#producerconfigs
https://access.redhat.com/documentation/ja-jp/red_hat_amq/2021.q2/html/using_amq_streams_on_rhel/producer-configuration-parameters-str

1)batch.size

* Batchの最大サイズ
* データが多く流入する場合、この値を大きくするとスループット向上

https://kafka.apache.org/documentation.html#producerconfigs_batch.size

2)linger.ms

* Batchの蓄積を待つ時間

https://kafka.apache.org/documentation.html#producerconfigs_linger.ms

3)acks

* リクエストの完了前にProducerがリーダーを受信する必要がある確認応答
 => これにより、送信されるメッセージの持続性制御
 => 「早 [acks=0]->[acks=1]->[acks=all] 遅」の順
  (ただし、データロストとのトレードオフ)

https://kafka.apache.org/documentation.html#producerconfigs_acks

acks value 説明 返信タイミング リスク
acks=0 ProducerはBrokerの応答(ack)無しでデータを渡す 即時(レスポンスを待たずにデータ送信) データロスの可能性あり
acks1 ProducerはBroker Leaderの応答を待ち、データを渡す Leader Replicaへの書き込み完了時 限定的なデータロス
acks=all(acks=-1) ProducerはBroker Leader + Replicasの応答を待ち、データを渡す Topicの最小ISR数まで複製完了時(コミット完了時) データロス無し

【2】Consumer

* データをBrokerに対して、受信する側のアプリケーション
 => なお、Consumerについては、以下のサイトも一読しておいた方がいいかも。

https://docs.confluent.io/ja-jp/platform/7.1/clients/consumer.html

設定値
https://kafka.apache.org/documentation.html#consumerconfigs
https://access.redhat.com/documentation/ja-jp/red_hat_amq/2021.q2/html/using_amq_streams_on_rhel/consumer-configuration-parameters-str

1)fetch.min.bytes / fetch.max.bytes

* 1 回のフェッチで返される最小・最大データ量
* Default: 1byte / 52428800 (50 mebibytes)

https://kafka.apache.org/documentation.html#consumerconfigs_fetch.min.bytes
https://kafka.apache.org/documentation.html#consumerconfigs_fetch.max.bytes

2)fetch.max.wait.ms

* フェッチの処理を保留する最大時間 [ms]
* Default: 500 [ms]

https://kafka.apache.org/documentation.html#consumerconfigs_fetch.max.wait.ms

3)heartbeat.interval.ms

* Consumer コーディネートへのハートビートする間隔
* Default: 3000 (3 seconds)

https://kafka.apache.org/documentation.html#connectconfigs_heartbeat.interval.ms

使用上の注意

* session.timeout.ms(Default: 10000 (10 seconds))よりも小さい値にすること
 => 通常は、session.timeout.ms の 1/3 よりも小さくする(だからDefault が3秒なんだと)

4)max.poll.interval.ms

* Consumerがグループを抜ける前の最大ポーリング間隔 [ミリ秒]
* Default:  300000 (5 minutes)

https://kafka.apache.org/documentation.html#consumerconfigs_max.poll.interval.ms

使用上の注意

* session.timeout.ms より大きくする必要がある

5)max.poll.records

* 1回のpoll() 呼出で取得する最大レコード数
* Default:  500

6)heartbeat.interval.ms

* Consumer コーディネータへのハートビート間隔
* Default:  3000 (3 seconds)

https://kafka.apache.org/documentation.html#consumerconfigs_heartbeat.interval.ms

7)session.timeout.ms

* consumer が、consumer グループのメンバーであり続けるために、
 ハートビートした際に受信しなくてはならない時間 [ミリ秒]
* Default:  45000 (45 seconds)

https://kafka.apache.org/documentation.html#consumerconfigs_session.timeout.ms

メモ

* ハートビートを逃したことにより再調整(Rebalance)が
 頻繁に発生する場合、この値を増やすのも手

【3】Broker

* Kafkaクラスタを構成するサーバ

https://access.redhat.com/documentation/ja-jp/red_hat_amq/2021.q2/html/using_amq_streams_on_rhel/broker-configuration-parameters-str

1)background.threads

https://kafka.apache.org/documentation.html#brokerconfigs_background.threads

* バックグラウンドで実行するタスクのスレッド数
* Default: 10

https://kafka.apache.org/documentation.html#brokerconfigs_background.threads

2)queued.max.requests

* リクエストキューのサイズ
* Default: 500

https://kafka.apache.org/documentation.html#brokerconfigs_queued.max.requests

3)message.max.bytes

* データ受信の最大Record Batchサイズ

https://kafka.apache.org/documentation.html#brokerconfigs_message.max.bytes

4)compression.type

* メッセージをブローカーに送信する前に圧縮する
* Valid Values: 'gzip', 'snappy', 'lz4', 'zstd'

https://kafka.apache.org/documentation/#brokerconfigs_compression.type
https://learn.microsoft.com/ja-jp/azure/hdinsight/kafka/apache-kafka-performance-tuning

効果

* データ圧縮を使用すると、ディスクに格納できるレコードの数が増加
* プロデューサーとブローカーによって使用されている圧縮形式の間で不一致がある場合、
 CPU のオーバーヘッドが増える可能性もある
* データは送信する前に圧縮し、処理する前に圧縮を解除する必要があるので
 そこでパフォーマンス低下する可能性もある

圧縮コーデック gzip と snappy

* gzip のほうが圧縮率が高いため、
 CPU 負荷は高くなるが、ディスク使用量は少なくなる
* snappy は圧縮率は低く、CPU のオーバーヘッドは少ない
*  gzip は snappy に比べて、5 倍の速度でデータを圧縮可能

【4】Topic

https://kafka.apache.org/documentation.html#topicconfigs
https://access.redhat.com/documentation/ja-jp/red_hat_amq/2021.q2/html/using_amq_streams_on_rhel/topic-configuration-parameters-str

1)Partition

* 1 Topic の Partition数
* パーティションの数をコンシューマーの数と同じに設定するのがいい
 => コンシューマーの数がパーティションの数より少ない場合、
 一部のコンシューマーは複数のパーティションから読み取りを行い、
 コンシューマーの待機時間が延びる
 => コンシューマーの数がパーティションの数を超える場合、
 コンシューマーはアイドル状態になるため、コンシューマー リソースを無駄に消費

効果

* 各Consumer スレッドは、メッセージを 1 つのPartitionから読み取るので、
 複数Partitionを指定することで、並列処理が可能になる
* ただし、Broker ごとのPartitionの数を増やすと、
 スループット(単位時間あたりに処理できるデータ量)が低下し、
 Topic が使用不可になる場合もある

2)max.message.bytes

* データ受信の最大Record Batchサイズ

https://kafka.apache.org/documentation.html#topicconfigs_max.message.bytes

【5】Connector

https://kafka.apache.org/documentation.html#connect_configuring
https://access.redhat.com/documentation/ja-jp/red_hat_amq/2021.q2/html/using_amq_streams_on_rhel/kafka-connect-configuration-parameters-str

1)tasks.max

* タスクの最大数
 => 通常、Kafka Connectクラスタのワーカーノード全体の CPU コアの数と同じ
 => 以下のサイトから、リクエストの同時実行数に関わる模様。

Kafka ConnectがTasksを分散する様子 - ujunのブログ

メモ:Snowflake
https://docs.snowflake.com/ja/user-guide/kafka-connector-install#optional-properties

より抜粋
~~~~~~~~~~~~~
この数は、低くまたは高く設定できます。
ただし、Snowflakeは高く設定することをお勧めしません。
~~~~~~~~~~~~~

2)offset.storage.topic

* コネクターオフセット(※1)を保存する Kafka トピック

https://access.redhat.com/documentation/ja-jp/red_hat_amq/2021.q3/html/using_amq_streams_on_openshift/proc-configuring-kafka-connect-user-authorization-str
※1:Offset(オフセット)

* パーティション内のメッセージの位置を表す
 => 特定のパーティションの各メッセージには一意のオフセットがあり、
  パーティション内のコンシューマーの位置を特定して、
  消費したレコード数を追跡するのに役立つ

https://access.redhat.com/documentation/ja-jp/red_hat_amq/7.6/html/amq_streams_on_openshift_overview/kafka-concepts_str

3)config.storage.topic

* コネクターおよびタスクステータスの設定を保存する Kafka トピック

4)status.storage.topic

* 

【6】AWS MSK

1)ブローカータイプ及びパーティション

* ブローカータイプを上げるのも一つの手
* ブローカータイプごとの推奨パーティション数を指定する

https://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/bestpractices.html

ブローカータイプ 推奨されるパーティションの数
kafka.t3.small 300
kafka.m5.large/kafka.m5.xlarge 1000
kafka.m5.2xlarge 2000
その他 4000

2)Connector capacity を調整する

* 以下の Connector capacity (コネクタの容量) を調整する
 + [MSK Connect Unit (MCU) count per worker] (ワーカーあたりの MSK Connect Unit (MCU) 数)
  => 各 MCU は、1 vCPU のコンピューティングと 4 GB のメモリを提供
 + [minimum number of workers] (ワーカーの最小数) 
 +  [maximum number of workers] (ワーカーの最大数)
 +  [Autoscaling utilization thresholds] (オートスケーリング使用率のしきい値) 
  => min/maxは、1-10の範囲で指定。
  => min/maxは、min < max でなければならない(つまり同じ値もダメ)
  => オートスケーリングをトリガーする MCU 消費の上限と下限の
  ターゲット使用率のしきい値 (%)

https://aws.amazon.com/jp/blogs/news/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/

参考文献

https://qiita.com/sigmalist/items/3b512e2ab49b07271665
https://zenn.dev/amezousan/scraps/7df6c1d21d8600
https://tech-lab.sios.jp/archives/32130
https://cloud.ibm.com/docs/EventStreams?topic=EventStreams-consuming_messages&locale=ja

関連記事

Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Apache Kafka ~ Kafkaコマンド ~
https://dk521123.hatenablog.com/entry/2023/05/16/000000
Apache Kafka ~ Kafka Connect ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Apache Kafka ~ Kafka Connect / PostgreSQL
https://dk521123.hatenablog.com/entry/2023/05/02/233806
Apache Kafka ~ Strimzi ~
https://dk521123.hatenablog.com/entry/2023/05/08/000133
ScalaApache Kafka / Producer ~
https://dk521123.hatenablog.com/entry/2023/04/27/235703
ScalaApache Kafka / Consumer ~
https://dk521123.hatenablog.com/entry/2023/04/28/014737
Amazon MSK ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/04/21/000000
Amazon MSK ~ 基本編 / Connector ~
https://dk521123.hatenablog.com/entry/2023/05/20/003516
Amazon MSK ~ AWS CLI
https://dk521123.hatenablog.com/entry/2023/05/26/000000
Terraform ~ AWS MSK ~
https://dk521123.hatenablog.com/entry/2023/05/14/122215
Terraform ~ AWS MSK Connect ~
https://dk521123.hatenablog.com/entry/2023/05/25/000000
Terraform ~ AWS EC2 ~
https://dk521123.hatenablog.com/entry/2023/05/21/003048
curl コマンド
https://dk521123.hatenablog.com/entry/2017/12/05/233100