【Kafka】Kafka Connect での デバッグ方法

■ はじめに

小ネタ。

現在、ローカル環境で、Flink/Kafka connect/PostgreSQLの
Hello worldを行っているのだが、その際のデバッグ方法が分かったので
一旦メモしておく

【1】デバッグ方法

# 以下のREST APIを呼び出す
$ curl -s http://localhost:8083/connectors/demo-jdbc-sink/status/

# 以下「【2】エラー内容」のようにエラーのコールスタックをはいてくれる

後日談

* 以下の公式ドキュメントにも載ってた

https://docs.confluent.io/platform/7.1/connect/logging.html#stack-trace

【2】エラー内容

1)「Sink connector 'xxx' is configured with 'delete.enabled=false' and 'pk.mode=record_key'」

{"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"},
"tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083",
"trace":"org.apache.kafka.connect.errors.ConnectException: 
Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)\n\tat
・・・
Caused by: org.apache.kafka.connect.errors.ConnectException: 
Sink connector 'demo-jdbc-sink' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and
 therefore requires records with a non-null Struct value and non-null Struct schema,
 but found record at (topic='demo_counter',partition=0,offset=0,timestamp=1691894781563)
 with a String value and string value schema.\n\tat

2)「Value schema must be of type Struct」

{"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083",
"trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat 
・・・
Caused by: org.apache.kafka.connect.errors.ConnectException: 
Value schema must be of type Struct\n\tat

3)「Connection to xxxx:xxxx refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.」

{"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"},
"tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083",
"trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat 
・・・
Caused by: org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: 
Connection to localhost:5431 refused. Check that the hostname and port are correct
 and that the postmaster is accepting TCP/IP connections.\n\tat 
・・・
org.postgresql.util.PSQLException:
 Connection to localhost:5431 refused. Check that the hostname and port are correct
 and that the postmaster is accepting TCP/IP connections.\n\tat

解決案

この時は、同じDocker内での通信だったので、
Kafka connectの設定を一旦消して、再度接続できるように設定しなおした

4)「Value schema must be of type Struct」

{"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"},
"tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083",
"trace":"org.apache.kafka.connect.errors.ConnectException:
 Exiting WorkerSinkTask due to unrecoverable exception.\n\tat 
・・・
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException:
 Value schema must be of type Struct\n\tat 

5)「JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields・・・」

{"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"},
"tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083",
"trace":"org.apache.kafka.connect.errors.ConnectException:
・・・
java.base/java.lang.Thread.run(Thread.java:829)\n
Caused by: org.apache.kafka.connect.errors.DataException:
 JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields
 and may not contain additional fields. If you are trying to deserialize plain JSON data,
 set schemas.enable=false in your converter configuration.\n\tat 

6)「Unrecognized token 'xxxx': was expecting」

{"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"},
"tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083",
"trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat 
java.base/java.lang.Thread.run(Thread.java:829)\n
Caused by: org.apache.kafka.connect.errors.DataException:
 Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat 
・・・
com.fasterxml.jackson.core.JsonParseException:
 Unrecognized token 'testkey': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n
at [Source: (byte[])\"testkey\"; line: 1, column: 8]\n\tat 

7)「Caused by: java.llass java.lang.Long cannot be cast to class」

name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"},
"tasks":[{"id":0,"sta"localhost:8083","trace":"org.apache.kafka.connect.errors.ConnectException:
 Exiting WorkerSinkTask due to u\tat 
java.base/java.lang.Thread.run(Thread.java:829)\n
Caused by: java.llass java.lang.Long cannot be cast
 to class org.apache.kafka.connect.data.Struct
 (java.lang.Long is in moduootstrap'; org.apache.kafka.connect.data.Struct is in unnamed module of loader 'app')\n\tat

関連記事

Kafka Connect ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Kafka Connect ~ Rest API
https://dk521123.hatenablog.com/entry/2023/05/31/000000
Kafka Connect ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/08/11/220921
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Docker compose ~ Pgweb/pgAdmin ~
https://dk521123.hatenablog.com/entry/2023/08/10/111919
Apache Flink ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/07/28/220039