【Kafka】Confluent ~ REST API ~

■ はじめに

Confluent の Kafka を使うことになり
TopicのREST API で作ることになりそうなので
調べてみた

目次

【1】Confluent の REST API
 1)REST Proxy API のバージョン
【2】発生するエラー一覧
【3】Kafka cluster
 1)Kafka cluster一覧表示
 2)Kafka cluster表示
【4】Topic
 1)Topic作成
 2)Topic一覧表示
 3)Topic表示

【1】Confluent の REST API

1)REST Proxy API のバージョン

* v2 と v3 がある

https://docs.confluent.io/ja-jp/platform/7.1/kafka-rest/api.html#content-types

Key v2 v3
コンテンツタイプ application/vnd.kafka.json.v2+json / application/vnd.kafka.binary.v2+json application/json

【2】発生するエラー一覧

* 以下に記載されている。(表としてまとめておく)

https://docs.confluent.io/ja-jp/platform/7.1/kafka-rest/api.html#errors

Status Code Error Messages Explanations
401 401 Unauthorized エラーコード 40101 -- Kafka 認証エラー
403 403 Forbidden エラーコード 40301 -- Kafka 認可エラー
404 404 Not Found エラーコード 40401 -- トピックが見つかりません
404 404 Not Found エラーコード 40402 -- パーティションが見つかりません。
422 422 Unprocessable Entity リクエストのペイロードが正しくフォーマットされていないか、セマンティクスエラーが含まれています
500 500 Internal Server Error Error code 50001 -- Zookeeper error.
500 500 Internal Server Error Error code 50002 -- Kafka error.
500 500 Internal Server Error Error code 50003 -- Retriable Kafka error. Although the operation failed, it's possible that retrying the request will be successful.
500 500 Internal Server Error エラーコード 50101 -- 指定されたブローカーについてのみ SSL エンドポイントが見つかりましたが、呼び出された API については SSL はまだサポートされていません。

【3】Kafka cluster

* Topicを作成するには、Kafka cluster ID が必要なので
 そのIDを取得するREST APIをまずは取り上げる

https://docs.confluent.io/ja-jp/platform/7.1/kafka-rest/api.html#cluster-v3

1)Kafka cluster一覧表示

構文

# Kafka クラスタ一覧表示
curl --silent -X GET http://<REST-endpoint>/kafka/v3/clusters | jq

# Kafka クラスタIDを取得する
curl --silent -X GET http://<REST-endpoint>/kafka/v3/clusters | jq -r '.data[] | .cluster_id'
Parameters Explanations Example
<REST-endpoint> Confluent の REST のホスト名およびポート番号 pkc-abcde.us-west4.gcp.confluent.cloud:443

2)Kafka cluster表示

構文

# Kafka クラスタ情報表示
curl --silent -X GET http://<REST-endpoint>/kafka/v3/clusters/<cluster-id> | jq
Parameters Explanations Example
<REST-endpoint> Confluent の REST のホスト名およびポート番号 pkc-abcde.us-west4.gcp.confluent.cloud:443
<cluster-id> Kafka クラスター ID lkc-vo9pz

【4】Topic

Parameters Explanations Example
<BASE64-encoded-key-and-secret> Basic 認証用のトーク ABC123...
<REST-endpoint> Confluent の REST のホスト名およびポート番号 pkc-abcde.us-west4.gcp.confluent.cloud:443
<cluster-id> Kafka クラスター ID lkc-vo9pz
<topic-name> 作成対象のTOPIC名 testTopic1
<Partitions-count> パーティション 5
<Replication-factor> レプリケーション 3

1)Topic作成

https://docs.confluent.io/ja-jp/platform/7.1/kafka-rest/api.html#create-a-topic
https://docs.confluent.io/cloud/current/kafka-rest/krest-qs.html#step-4-create-a-topic-using-cluster-administration-for-the-ak-rest-api

構文

curl -H "Authorization: Basic <BASE64-encoded-key-and-secret>" -H 'Content-Type: application/json' \
--request POST --url 'https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics' \
-d '{"topic_name": "<topic-name>", "partitions_count": <Partitions-count>, "replication_factor": <Replication-factor>}'

curl -H "Authorization: Basic ABC123ABC" -H 'Content-Type: application/json' \
--request POST --url 'https://pkc-abcde.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics' \
-d '{"topic_name": "testTopic1", "partitions_count": 5, "replication_factor": 3}'

2)Topic一覧表示

構文

# Topic一覧表示
curl --silent -X GET http://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics | jq

# Topic名のみ表示
curl --silent -X GET http://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics | jq -r '.data[] | .topic_name'

# 【補足】
# 公式ドキュメントに「jq | grep '.topic_name'」で紹介されているが、
# 値だけ欲しいなら上記でフィルタリングした方がいい

curl --silent -X GET https://pkc-abcde.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics | jq

3)Topic表示

構文

curl --silent -X GET http://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name> | jq

curl --silent -X GET https://pkc-abcde.us-west4.gcp.confluent.cloud:443/kafka/v3/clusters/lkc-vo9pz/topics/test_topic | jq

参考文献

https://docs.confluent.io/cloud/current/kafka-rest/krest-qs.html

関連記事

Confluent ~ Local環境構築 ~
https://dk521123.hatenablog.com/entry/2024/05/10/000325
Confluent ~ Confluent CLI
https://dk521123.hatenablog.com/entry/2024/05/12/000159
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 環境構築 / Linux編 ~
https://dk521123.hatenablog.com/entry/2024/02/05/153202
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Kafka Connect ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Kafka Connect ~ Rest API
https://dk521123.hatenablog.com/entry/2023/05/31/000000
curl コマンド
https://dk521123.hatenablog.com/entry/2017/12/05/233100
jq コマンド ~ JSON を扱う ~
https://dk521123.hatenablog.com/entry/2020/02/01/000000