【Kafka】Kafka Connect ~ 基本編 / PostgreSQL ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/04/29/185133

の続き。

今回は、Kafka connect + PostgreSQLで試してみる。

目次

【1】今回やること
【2】実行前の下準備
 1)JDBCプラグイン設定
 2)zookeeper起動
 3)Kafka broker起動
 4)トピックの確認
 5)Kafka Connect起動
【2】Sourceの設定
 1)Source用の設定ファイルの作成
 2)SourceとしてのKafka Connectを起動
 3)動作確認
【3】Sinkの設定
 1)使用するSQL
 2)Sink用の設定ファイルの作成
 3)SinkとしてのKafka Connectを起動
 4)動作確認
【4】後片付け
 1)Connectorの削除

【1】今回やること

[Source/PostgreSQL] -> [Kafka] -> [Sink/PostgreSQL]

【2】実行前の下準備

1)使用するSQL

CREATE DATABASE sample_db;

CREATE TABLE person (
  id bigint PRIMARY KEY,
  name varchar(256)
);

CREATE USER connect_user with password 'password';
GRANT ALL ON person TO connect_user;

INSERT INTO person(id, name) VALUES (1, 'Mike');
INSERT INTO person(id, name) VALUES (2, 'Smith');

1)JDBCプラグイン設定

[a] JDBC Connector

[1] 以下のサイトからモジュールをダウンロード
 => 今回の場合、「confluentinc-kafka-connect-jdbc-10.7.1.zip」

https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

[2] ダウンロードしたZIPファイルを解凍し
 Kafka の JDBCモジュールをKafkaの「libs」配下に置く
 => 今回の場合、
 「confluentinc-kafka-connect-jdbc-10.7.1\lib\kafka-connect-jdbc-10.7.1.jar」
  を 「【Kafka Home】\libs」に配置

[a] JDBC Connector

[1] 以下のサイトからモジュールをダウンロード
 => 今回の場合、「postgresql-42.6.0.jar」

https://jdbc.postgresql.org/download/

[2] Kafka の PostgreSQL JDBCモジュールをKafkaの「libs」配下に置く
 => 今回の場合、「postgresql-42.6.0.jar」
  を 「【Kafka Home】\libs」に配置

2)zookeeper起動

# Start the ZooKeeper service
.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties

3)Kafka broker起動

# Start the Kafka broker service
.\bin\windows\kafka-server-start.bat config\server.properties

4)トピックの確認

#  Topic「connect-test」確認
bin\windows\kafka-topics.bat --describe --topic connect-test --bootstrap-server localhost:9092

# もし、何も作成していない場合は、Topic「connect-test」作成
# bin\windows\kafka-topics.bat --create --topic connect-test --bootstrap-server localhost:9092

5)Kafka Connect起動

bin\windows\connect-distributed.bat config\connect-distributed.properties

【2】Sourceの設定

https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html

を参考に作成
key Explanations
mode bulk, timestamp, incrementing(加算), timestamp+incrementing
incrementing.column.name mode=incrementingの場合、加算される対象の項目名
table.whitelist 対象のテーブル名(カンマ区切りで。e.g. "table.whitelist": "User, Address, Email")
topic.prefix TopicのPrefix名(「{topic.prefix}-{テーブル名}」になる)

1)Source用の設定ファイルの作成

source-postgresql-demo-data.json

{
  "name": "source-postgresql-demo-data",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", 
    "connection.url" : "jdbc:postgresql://localhost:5432/sample_db",
    "connection.user" : "connect_user",
    "connection.password" : "password",
    "mode" : "incrementing",
    "incrementing.column.name" : "id",
    "table.whitelist" : "person",
    "tasks.max" : "1"
  }
}

2)SourceとしてのKafka Connectを起動

curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors -d @source-postgresql-demo-data.json

3)動作確認

[1] コネクターの確認

curl http://localhost:8083/connectors
# ["source-postgresql-demo-data"] が表示

[2] コネクターの状態確認

curl http://localhost:8083/connectors/source-postgresql-demo-data/status

{"name":"source-postgresql-demo-data","connector":{"state":"RUNNING","worker_id":"XXX.XXX.XXX.XXX:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"XXX.XXX.XXX.XXX:8083"}],"type":"source"}

[3] Consumer起動 (データがあるか確認)

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic person --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"person"},"payload":{"id":1,"name":"Mike"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"person"},"payload":{"id":2,"name":"Smith"}}

【3】Sinkの設定

1)使用するSQL

CREATE DATABASE sample_db_clone;
CREATE USER connect_user_for_clone with password 'password';

CREATE TABLE person (
  id bigint PRIMARY KEY,
  name varchar(256)
);
GRANT ALL ON person TO connect_user_for_clone;

2)Sink用の設定ファイルの作成

https://docs.confluent.io/ja-jp/kafka-connectors/jdbc/10.0/sink-connector/sink_config_options.html
https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html

を参考に作成

sink-postgresql-demo-data.json

{
  "name": "sink-postgresql-demo-data",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://localhost:5432/sample_db_clone",
    "connection.user": "connect_user_for_clone",
    "connection.password": "password",
    "insert.mode": "insert",
    "pk.mode": "record_value",
    "pk.fields": "id",
    "auto.create": "false",
    "topics": "person"
  }
}

3)SinkとしてのKafka Connectを起動

curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors -d @sink-postgresql-demo-data.json

[1] コネクターの確認

curl http://localhost:8083/connectors
# ["source-postgresql-demo-data","sink-postgresql-demo-data"] が表示

[2] コネクターの状態確認

curl http://localhost:8083/connectors/sink-postgresql-demo-data/status

{"name":"simk-postgresql-demo-data","connector":{"state":"RUNNING","worker_id":"XXX.XXX.XXX.XXX:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"XXX.XXX.XXX.XXX:8083"}],"type":"sink"}

3)データ追加

INSERT INTO person(id, name) VALUES (3, 'Yom');
INSERT INTO person(id, name) VALUES (4, 'Kebin');
INSERT INTO person(id, name) VALUES (5, 'Tim');
INSERT INTO person(id, name) VALUES (6, 'Ken');

4)動作確認

[1] DBにデータがあるか確認

"id","name"
"1","Mike"
"2","Smith"
"3","Yom"
"4","Kebin"
"5","Tim"
"6","Ken"

【4】後片付け

1)Connectorの削除

curl -X DELETE http://localhost:8083/connectors/source-postgresql-demo-data

curl -X DELETE http://localhost:8083/connectors/sink-postgresql-demo-data

参考文献

https://tjtjtj.hatenablog.com/entry/2019/11/26/230000
https://tutuz-tech.hatenablog.com/entry/2019/03/21/000835

関連記事

Kafka Connect ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Kafka Connect ~ Connector 構成プロパティ ~
https://dk521123.hatenablog.com/entry/2023/06/02/011131
Kafka Connect ~ Rest API
https://dk521123.hatenablog.com/entry/2023/05/31/000000
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Amazon MSK ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/04/21/000000
Amazon MSK ~ 基本編 / Connector ~
https://dk521123.hatenablog.com/entry/2023/05/20/003516
ScalaApache Kafka / Producer ~
https://dk521123.hatenablog.com/entry/2023/04/27/235703
ScalaApache Kafka / Consumer ~
https://dk521123.hatenablog.com/entry/2023/04/28/014737