【Kafka】Kafka Connect ~ Rest API ~

■ はじめに

Kafka Connect で 期待した通りのパフォーマンスがでないので
どう調査するのだろうって調べてた過程で、Rest API を見つけたので
メモしておく。

目次

【1】Kafka connect の Rest API 一覧
 1)おさらい:curlコマンド
【2】コネクタ
 1)コネクタの登録
 2)コネクタ一覧表示
 3)コネクタの状況を確認
 4)コネクタの構成/タスク/タイプ
 5)コネクタの削除
【3】ログ
 1)ログレベルの確認
 2)ログレベルの変更

【1】Kafka connect の Rest API 一覧

* 以下に全てのRest API 一覧を見ることができる

https://docs.confluent.io/platform/7.1/connect/references/restapi.html
https://docs.confluent.io/ja-jp/platform/7.1/connect/monitoring.html

1)おさらい:curlコマンド

* curlコマンド の詳細については、以下の関連記事を参照のこと

curl コマンド
https://dk521123.hatenablog.com/entry/2017/12/05/233100

【2】コネクタ

1)コネクタの登録

curl -X POST \
  -H "Content-Type: application/json" \
  --data '{ "name": "demo-jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": 1, "connection.url": "jdbc:postgresql://localhost:5431/demo_db", "connection.user": "postgres", "connection.password": "password", "insert.mode": "insert", "auto.create": "true", "topics": "demo_counter" } }' \
  http://localhost:8083/connectors

2)コネクタ一覧表示

curl -s http://localhost:8083/connectors/

# -s, --silent: 進捗やエラーを表示しない

3)コネクタの状況を確認

# curl -s http://localhost:8083/connectors/[CONNECT_NAME]/status/
curl -s http://localhost:8083/connectors/demo-jdbc-sink/status/

# -s, --silent: 進捗やエラーを表示しない

4)コネクタの構成/タスク/タイプ

# curl localhost:8083/connectors/[CONNECT_NAME]/tasks
curl localhost:8083/connectors/demo-jdbc-sink/tasks | jq

出力例

[
  {
    "id": {
      "connector": "demo-jdbc-sink",
      "task": 0
    },
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.password": "password",
      "tasks.max": "1",
      "topics": "demo_counter",
      "key.converter.schemas.enable": "false",
      "delete.enabled": "true",
      "auto.evolve": "true",
      "task.class": "io.confluent.connect.jdbc.sink.JdbcSinkTask",
      "connection.user": "postgres",
      "value.converter.schemas.enable": "false",
      "name": "demo-jdbc-sink",
      "auto.create": "true",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "connection.url": "jdbc:postgresql://postgresql:5432/demo_db",
      "insert.mode": "upsert",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "pk.mode": "record_key",
      "pk.fields": "word"
    }
  }
]

構成のみ

# curl localhost:8083/connectors/[CONNECT_NAME]/config
curl localhost:8083/connectors/demo-jdbc-sink/config | jq

5)コネクタの削除

https://docs.confluent.io/platform/7.1/connect/references/restapi.html#delete--connectors-(string-name)-

# DELETE /connectors/(string:name)/
curl -X DELETE -s http://localhost:8083/connectors/demo-jdbc-sink

【3】ログ

https://developer.confluent.io/learn-kafka/kafka-connect/troubleshooting-kafka-connect/#dynamic-log-configuration

1)ログレベルの確認

curl -s http://localhost:8083/admin/logger/
# -s, --silent: 進捗やエラーを表示しない

# jq がインストールされている場合
curl -s http://localhost:8083/admin/logger/ | jq

https://docs.confluent.io/platform/7.1/connect/logging.html#check-log-levels

2)ログレベルの変更

curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSourceTask \
-d '{"level": "TRACE"}' | jq '.'

https://docs.confluent.io/platform/7.1/connect/logging.html#change-the-log-level-for-a-specific-logger
補足

# -X: HTTPメソッド(GET/POST/PUT/DELETE etc)を指定
# -H: Request Headerを追加(Content-Typeを指定する場合)
# -d: POST でフォームを送信する。引数で送信するコンテンツを指定する

出力結果

[
  "org.apache.kafka.connect.runtime.WorkerSourceTask"
]

参考文献

https://developer.confluent.io/learn-kafka/kafka-connect/troubleshooting-kafka-connect/
https://docs.confluent.io/ja-jp/platform/7.1/connect/monitoring.html

関連記事

Kafka Connect ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/08/11/220921
Kafka Connect ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Kafka Connect ~ 基本編 / PostgreSQL
https://dk521123.hatenablog.com/entry/2023/05/02/233806
Kafka Connect ~ Connector 構成プロパティ ~
https://dk521123.hatenablog.com/entry/2023/06/02/011131
Kafka Connect での デバッグ方法
https://dk521123.hatenablog.com/entry/2023/08/15/215636 Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Amazon MSK ~ AWS CLI
https://dk521123.hatenablog.com/entry/2023/06/03/003941
curl コマンド
https://dk521123.hatenablog.com/entry/2017/12/05/233100