■ はじめに
小ネタ。 現在、ローカル環境で、Flink/Kafka connect/PostgreSQLの Hello worldを行っているのだが、その際のデバッグ方法が分かったので 一旦メモしておく
【1】デバッグ方法
# 以下のREST APIを呼び出す $ curl -s http://localhost:8083/connectors/demo-jdbc-sink/status/ # 以下「【2】エラー内容」のようにエラーのコールスタックをはいてくれる
後日談
* 以下の公式ドキュメントにも載ってた
https://docs.confluent.io/platform/7.1/connect/logging.html#stack-trace
【2】エラー内容
1)「Sink connector 'xxx' is configured with 'delete.enabled=false' and 'pk.mode=record_key'」
{"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"}, "tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083", "trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)\n\tat ・・・ Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'demo-jdbc-sink' is configured with 'delete.enabled=false' and 'pk.mode=record_key' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='demo_counter',partition=0,offset=0,timestamp=1691894781563) with a String value and string value schema.\n\tat
2)「Value schema must be of type Struct」
{"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083", "trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat ・・・ Caused by: org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct\n\tat
3)「Connection to xxxx:xxxx refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.」
{"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"}, "tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083", "trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat ・・・ Caused by: org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Connection to localhost:5431 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat ・・・ org.postgresql.util.PSQLException: Connection to localhost:5431 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat
解決案
この時は、同じDocker内での通信だったので、 Kafka connectの設定を一旦消して、再度接続できるように設定しなおした
4)「Value schema must be of type Struct」
{"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"}, "tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083", "trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat ・・・ java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct\n\tat
5)「JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields・・・」
{"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"}, "tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083", "trace":"org.apache.kafka.connect.errors.ConnectException: ・・・ java.base/java.lang.Thread.run(Thread.java:829)\n Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat
6)「Unrecognized token 'xxxx': was expecting」
{"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"}, "tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083", "trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: \n\tat ・・・ com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'testkey': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (byte[])\"testkey\"; line: 1, column: 8]\n\tat
7)「Caused by: java.llass java.lang.Long cannot be cast to class」
name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"}, "tasks":[{"id":0,"sta"localhost:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to u\tat java.base/java.lang.Thread.run(Thread.java:829)\n Caused by: java.llass java.lang.Long cannot be cast to class org.apache.kafka.connect.data.Struct (java.lang.Long is in moduootstrap'; org.apache.kafka.connect.data.Struct is in unnamed module of loader 'app')\n\tat
関連記事
Kafka Connect ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Kafka Connect ~ Rest API ~
https://dk521123.hatenablog.com/entry/2023/05/31/000000
Kafka Connect ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/08/11/220921
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Docker compose ~ Pgweb/pgAdmin ~
https://dk521123.hatenablog.com/entry/2023/08/10/111919
Apache Flink ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/07/28/220039