■ はじめに
AWS MSK(Amazon Managed Streaming for Apache Kafka) を使ったシステムのパフォーマンステストを行っているが てんでダメダメだった。 そこで、Kafkaに関するパフォーマンスに関わるプロパティなどを 調べてみたので、徐々にメモ。 (物凄い種類の設定内容でとても一日じゃ調べきれない、、、)
目次
【1】Producer 1)batch.size 2)linger.ms 3)acks 【2】Consumer 1)fetch.min.bytes / fetch.max.bytes 2)fetch.max.wait.ms 3)heartbeat.interval.ms 4)max.poll.interval.ms 5)max.poll.records 6)heartbeat.interval.ms 7)session.timeout.ms 【3】Broker 1)background.threads 2)queued.max.requests 3)message.max.bytes 4)compression.type 【4】Topic 1)Partition 2)max.message.bytes 【5】Connector 1)tasks.max 【6】AWS MSK 1)ブローカータイプ及びパーティション数 2)Connector capacity を調整する
【1】Producer
* データをBrokerに対して、送信する側のアプリケーション
設定値
https://kafka.apache.org/documentation.html#producerconfigs
https://access.redhat.com/documentation/ja-jp/red_hat_amq/2021.q2/html/using_amq_streams_on_rhel/producer-configuration-parameters-str
1)batch.size
* Batchの最大サイズ * データが多く流入する場合、この値を大きくするとスループット向上
https://kafka.apache.org/documentation.html#producerconfigs_batch.size
2)linger.ms
* Batchの蓄積を待つ時間
https://kafka.apache.org/documentation.html#producerconfigs_linger.ms
3)acks
* リクエストの完了前にProducerがリーダーを受信する必要がある確認応答 => これにより、送信されるメッセージの持続性制御 => 「早 [acks=0]->[acks=1]->[acks=all] 遅」の順 (ただし、データロストとのトレードオフ)
https://kafka.apache.org/documentation.html#producerconfigs_acks
acks value | 説明 | 返信タイミング | リスク |
---|---|---|---|
acks=0 | ProducerはBrokerの応答(ack)無しでデータを渡す | 即時(レスポンスを待たずにデータ送信) | データロスの可能性あり |
acks1 | ProducerはBroker Leaderの応答を待ち、データを渡す | Leader Replicaへの書き込み完了時 | 限定的なデータロス |
acks=all(acks=-1) | ProducerはBroker Leader + Replicasの応答を待ち、データを渡す | Topicの最小ISR数まで複製完了時(コミット完了時) | データロス無し |
【2】Consumer
* データをBrokerに対して、受信する側のアプリケーション => なお、Consumerについては、以下のサイトも一読しておいた方がいいかも。
https://docs.confluent.io/ja-jp/platform/7.1/clients/consumer.html
設定値
https://kafka.apache.org/documentation.html#consumerconfigs
https://access.redhat.com/documentation/ja-jp/red_hat_amq/2021.q2/html/using_amq_streams_on_rhel/consumer-configuration-parameters-str
1)fetch.min.bytes / fetch.max.bytes
* 1 回のフェッチで返される最小・最大データ量 * Default: 1byte / 52428800 (50 mebibytes)
https://kafka.apache.org/documentation.html#consumerconfigs_fetch.min.bytes
https://kafka.apache.org/documentation.html#consumerconfigs_fetch.max.bytes
2)fetch.max.wait.ms
* フェッチの処理を保留する最大時間 [ms] * Default: 500 [ms]
https://kafka.apache.org/documentation.html#consumerconfigs_fetch.max.wait.ms
3)heartbeat.interval.ms
* Consumer コーディネートへのハートビートする間隔 * Default: 3000 (3 seconds)
https://kafka.apache.org/documentation.html#connectconfigs_heartbeat.interval.ms
使用上の注意
* session.timeout.ms(Default: 10000 (10 seconds))よりも小さい値にすること => 通常は、session.timeout.ms の 1/3 よりも小さくする(だからDefault が3秒なんだと)
4)max.poll.interval.ms
* Consumerがグループを抜ける前の最大ポーリング間隔 [ミリ秒] * Default: 300000 (5 minutes)
https://kafka.apache.org/documentation.html#consumerconfigs_max.poll.interval.ms
使用上の注意
* session.timeout.ms より大きくする必要がある
5)max.poll.records
* 1回のpoll() 呼出で取得する最大レコード数 * Default: 500
6)heartbeat.interval.ms
* Consumer コーディネータへのハートビート間隔 * Default: 3000 (3 seconds)
https://kafka.apache.org/documentation.html#consumerconfigs_heartbeat.interval.ms
7)session.timeout.ms
* consumer が、consumer グループのメンバーであり続けるために、 ハートビートした際に受信しなくてはならない時間 [ミリ秒] * Default: 45000 (45 seconds)
https://kafka.apache.org/documentation.html#consumerconfigs_session.timeout.ms
メモ
* ハートビートを逃したことにより再調整(Rebalance)が 頻繁に発生する場合、この値を増やすのも手
【3】Broker
* Kafkaクラスタを構成するサーバ
1)background.threads
https://kafka.apache.org/documentation.html#brokerconfigs_background.threads
* バックグラウンドで実行するタスクのスレッド数 * Default: 10
https://kafka.apache.org/documentation.html#brokerconfigs_background.threads
2)queued.max.requests
* リクエストキューのサイズ * Default: 500
https://kafka.apache.org/documentation.html#brokerconfigs_queued.max.requests
3)message.max.bytes
* データ受信の最大Record Batchサイズ
https://kafka.apache.org/documentation.html#brokerconfigs_message.max.bytes
4)compression.type
* メッセージをブローカーに送信する前に圧縮する * Valid Values: 'gzip', 'snappy', 'lz4', 'zstd'
https://kafka.apache.org/documentation/#brokerconfigs_compression.type
https://learn.microsoft.com/ja-jp/azure/hdinsight/kafka/apache-kafka-performance-tuning
効果
* データ圧縮を使用すると、ディスクに格納できるレコードの数が増加 * プロデューサーとブローカーによって使用されている圧縮形式の間で不一致がある場合、 CPU のオーバーヘッドが増える可能性もある * データは送信する前に圧縮し、処理する前に圧縮を解除する必要があるので そこでパフォーマンス低下する可能性もある
圧縮コーデック gzip と snappy
* gzip のほうが圧縮率が高いため、 CPU 負荷は高くなるが、ディスク使用量は少なくなる * snappy は圧縮率は低く、CPU のオーバーヘッドは少ない * gzip は snappy に比べて、5 倍の速度でデータを圧縮可能
【4】Topic
https://kafka.apache.org/documentation.html#topicconfigs
https://access.redhat.com/documentation/ja-jp/red_hat_amq/2021.q2/html/using_amq_streams_on_rhel/topic-configuration-parameters-str
1)Partition
* 1 Topic の Partition数 * パーティションの数をコンシューマーの数と同じに設定するのがいい => コンシューマーの数がパーティションの数より少ない場合、 一部のコンシューマーは複数のパーティションから読み取りを行い、 コンシューマーの待機時間が延びる => コンシューマーの数がパーティションの数を超える場合、 コンシューマーはアイドル状態になるため、コンシューマー リソースを無駄に消費
効果
* 各Consumer スレッドは、メッセージを 1 つのPartitionから読み取るので、 複数Partitionを指定することで、並列処理が可能になる * ただし、Broker ごとのPartitionの数を増やすと、 スループット(単位時間あたりに処理できるデータ量)が低下し、 Topic が使用不可になる場合もある
2)max.message.bytes
* データ受信の最大Record Batchサイズ
https://kafka.apache.org/documentation.html#topicconfigs_max.message.bytes
【5】Connector
https://kafka.apache.org/documentation.html#connect_configuring
https://access.redhat.com/documentation/ja-jp/red_hat_amq/2021.q2/html/using_amq_streams_on_rhel/kafka-connect-configuration-parameters-str
1)tasks.max
* タスクの最大数 => 通常、Kafka Connectクラスタのワーカーノード全体の CPU コアの数と同じ => 以下のサイトから、リクエストの同時実行数に関わる模様。
Kafka ConnectがTasksを分散する様子 - ujunのブログ
メモ:Snowflake
https://docs.snowflake.com/ja/user-guide/kafka-connector-install#optional-properties
より抜粋 ~~~~~~~~~~~~~ この数は、低くまたは高く設定できます。 ただし、Snowflakeは高く設定することをお勧めしません。 ~~~~~~~~~~~~~
2)offset.storage.topic
* コネクターオフセット(※1)を保存する Kafka トピック
* パーティション内のメッセージの位置を表す => 特定のパーティションの各メッセージには一意のオフセットがあり、 パーティション内のコンシューマーの位置を特定して、 消費したレコード数を追跡するのに役立つ
3)config.storage.topic
* コネクターおよびタスクステータスの設定を保存する Kafka トピック
4)status.storage.topic
*
【6】AWS MSK
1)ブローカータイプ及びパーティション数
* ブローカータイプを上げるのも一つの手 * ブローカータイプごとの推奨パーティション数を指定する
https://docs.aws.amazon.com/ja_jp/msk/latest/developerguide/bestpractices.html
ブローカータイプ | 推奨されるパーティションの数 |
---|---|
kafka.t3.small | 300 |
kafka.m5.large/kafka.m5.xlarge | 1000 |
kafka.m5.2xlarge | 2000 |
その他 | 4000 |
2)Connector capacity を調整する
* 以下の Connector capacity (コネクタの容量) を調整する + [MSK Connect Unit (MCU) count per worker] (ワーカーあたりの MSK Connect Unit (MCU) 数) => 各 MCU は、1 vCPU のコンピューティングと 4 GB のメモリを提供 + [minimum number of workers] (ワーカーの最小数) + [maximum number of workers] (ワーカーの最大数) + [Autoscaling utilization thresholds] (オートスケーリング使用率のしきい値) => min/maxは、1-10の範囲で指定。 => min/maxは、min < max でなければならない(つまり同じ値もダメ) => オートスケーリングをトリガーする MCU 消費の上限と下限の ターゲット使用率のしきい値 (%)
参考文献
https://qiita.com/sigmalist/items/3b512e2ab49b07271665
https://zenn.dev/amezousan/scraps/7df6c1d21d8600
https://tech-lab.sios.jp/archives/32130
https://cloud.ibm.com/docs/EventStreams?topic=EventStreams-consuming_messages&locale=ja
関連記事
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Apache Kafka ~ Kafkaコマンド ~
https://dk521123.hatenablog.com/entry/2023/05/16/000000
Apache Kafka ~ Kafka Connect ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Apache Kafka ~ Kafka Connect / PostgreSQL ~
https://dk521123.hatenablog.com/entry/2023/05/02/233806
Apache Kafka ~ Strimzi ~
https://dk521123.hatenablog.com/entry/2023/05/08/000133
Scala ~ Apache Kafka / Producer ~
https://dk521123.hatenablog.com/entry/2023/04/27/235703
Scala ~ Apache Kafka / Consumer ~
https://dk521123.hatenablog.com/entry/2023/04/28/014737
Amazon MSK ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/04/21/000000
Amazon MSK ~ 基本編 / Connector ~
https://dk521123.hatenablog.com/entry/2023/05/20/003516
Amazon MSK ~ AWS CLI ~
https://dk521123.hatenablog.com/entry/2023/05/26/000000
Terraform ~ AWS MSK ~
https://dk521123.hatenablog.com/entry/2023/05/14/122215
Terraform ~ AWS MSK Connect ~
https://dk521123.hatenablog.com/entry/2023/05/25/000000
Terraform ~ AWS EC2 ~
https://dk521123.hatenablog.com/entry/2023/05/21/003048
curl コマンド
https://dk521123.hatenablog.com/entry/2017/12/05/233100