■ はじめに
https://dk521123.hatenablog.com/entry/2023/04/29/185133
の続き。 今回は、Kafka connect + PostgreSQLで試してみる。
目次
【1】今回やること 【2】実行前の下準備 1)JDBCプラグイン設定 2)zookeeper起動 3)Kafka broker起動 4)トピックの確認 5)Kafka Connect起動 【2】Sourceの設定 1)Source用の設定ファイルの作成 2)SourceとしてのKafka Connectを起動 3)動作確認 【3】Sinkの設定 1)使用するSQL 2)Sink用の設定ファイルの作成 3)SinkとしてのKafka Connectを起動 4)動作確認 【4】後片付け 1)Connectorの削除
【1】今回やること
[Source/PostgreSQL] -> [Kafka] -> [Sink/PostgreSQL]
【2】実行前の下準備
1)使用するSQL
CREATE DATABASE sample_db; CREATE TABLE person ( id bigint PRIMARY KEY, name varchar(256) ); CREATE USER connect_user with password 'password'; GRANT ALL ON person TO connect_user; INSERT INTO person(id, name) VALUES (1, 'Mike'); INSERT INTO person(id, name) VALUES (2, 'Smith');
1)JDBCプラグイン設定
[a] JDBC Connector
[1] 以下のサイトからモジュールをダウンロード => 今回の場合、「confluentinc-kafka-connect-jdbc-10.7.1.zip」
https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
[2] ダウンロードしたZIPファイルを解凍し Kafka の JDBCモジュールをKafkaの「libs」配下に置く => 今回の場合、 「confluentinc-kafka-connect-jdbc-10.7.1\lib\kafka-connect-jdbc-10.7.1.jar」 を 「【Kafka Home】\libs」に配置
[a] JDBC Connector
[1] 以下のサイトからモジュールをダウンロード => 今回の場合、「postgresql-42.6.0.jar」
https://jdbc.postgresql.org/download/
[2] Kafka の PostgreSQL JDBCモジュールをKafkaの「libs」配下に置く => 今回の場合、「postgresql-42.6.0.jar」 を 「【Kafka Home】\libs」に配置
2)zookeeper起動
# Start the ZooKeeper service .\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
3)Kafka broker起動
# Start the Kafka broker service .\bin\windows\kafka-server-start.bat config\server.properties
4)トピックの確認
# Topic「connect-test」確認 bin\windows\kafka-topics.bat --describe --topic connect-test --bootstrap-server localhost:9092 # もし、何も作成していない場合は、Topic「connect-test」作成 # bin\windows\kafka-topics.bat --create --topic connect-test --bootstrap-server localhost:9092
5)Kafka Connect起動
bin\windows\connect-distributed.bat config\connect-distributed.properties
【2】Sourceの設定
https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html
を参考に作成
key | Explanations |
---|---|
mode | bulk, timestamp, incrementing(加算), timestamp+incrementing |
incrementing.column.name | mode=incrementingの場合、加算される対象の項目名 |
table.whitelist | 対象のテーブル名(カンマ区切りで。e.g. "table.whitelist": "User, Address, Email") |
topic.prefix | TopicのPrefix名(「{topic.prefix}-{テーブル名}」になる) |
1)Source用の設定ファイルの作成
source-postgresql-demo-data.json
{ "name": "source-postgresql-demo-data", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url" : "jdbc:postgresql://localhost:5432/sample_db", "connection.user" : "connect_user", "connection.password" : "password", "mode" : "incrementing", "incrementing.column.name" : "id", "table.whitelist" : "person", "tasks.max" : "1" } }
2)SourceとしてのKafka Connectを起動
curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors -d @source-postgresql-demo-data.json
3)動作確認
[1] コネクターの確認
curl http://localhost:8083/connectors
# ["source-postgresql-demo-data"] が表示
[2] コネクターの状態確認
curl http://localhost:8083/connectors/source-postgresql-demo-data/status {"name":"source-postgresql-demo-data","connector":{"state":"RUNNING","worker_id":"XXX.XXX.XXX.XXX:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"XXX.XXX.XXX.XXX:8083"}],"type":"source"}
[3] Consumer起動 (データがあるか確認)
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic person --from-beginning {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"person"},"payload":{"id":1,"name":"Mike"}} {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":false,"name":"person"},"payload":{"id":2,"name":"Smith"}}
【3】Sinkの設定
1)使用するSQL
CREATE DATABASE sample_db_clone; CREATE USER connect_user_for_clone with password 'password'; CREATE TABLE person ( id bigint PRIMARY KEY, name varchar(256) ); GRANT ALL ON person TO connect_user_for_clone;
2)Sink用の設定ファイルの作成
https://docs.confluent.io/ja-jp/kafka-connectors/jdbc/10.0/sink-connector/sink_config_options.html
https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html
を参考に作成
sink-postgresql-demo-data.json
{ "name": "sink-postgresql-demo-data", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgresql://localhost:5432/sample_db_clone", "connection.user": "connect_user_for_clone", "connection.password": "password", "insert.mode": "insert", "pk.mode": "record_value", "pk.fields": "id", "auto.create": "false", "topics": "person" } }
3)SinkとしてのKafka Connectを起動
curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors -d @sink-postgresql-demo-data.json
[1] コネクターの確認
curl http://localhost:8083/connectors
# ["source-postgresql-demo-data","sink-postgresql-demo-data"] が表示
[2] コネクターの状態確認
curl http://localhost:8083/connectors/sink-postgresql-demo-data/status {"name":"simk-postgresql-demo-data","connector":{"state":"RUNNING","worker_id":"XXX.XXX.XXX.XXX:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"XXX.XXX.XXX.XXX:8083"}],"type":"sink"}
3)データ追加
INSERT INTO person(id, name) VALUES (3, 'Yom'); INSERT INTO person(id, name) VALUES (4, 'Kebin'); INSERT INTO person(id, name) VALUES (5, 'Tim'); INSERT INTO person(id, name) VALUES (6, 'Ken');
4)動作確認
[1] DBにデータがあるか確認
"id","name" "1","Mike" "2","Smith" "3","Yom" "4","Kebin" "5","Tim" "6","Ken"
【4】後片付け
1)Connectorの削除
curl -X DELETE http://localhost:8083/connectors/source-postgresql-demo-data curl -X DELETE http://localhost:8083/connectors/sink-postgresql-demo-data
参考文献
https://tjtjtj.hatenablog.com/entry/2019/11/26/230000
https://tutuz-tech.hatenablog.com/entry/2019/03/21/000835
関連記事
Kafka Connect ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Amazon MSK ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/04/21/000000
Amazon MSK ~ 基本編 / Connector ~
https://dk521123.hatenablog.com/entry/2023/05/20/003516
Scala ~ Apache Kafka / Producer ~
https://dk521123.hatenablog.com/entry/2023/04/27/235703
Scala ~ Apache Kafka / Consumer ~
https://dk521123.hatenablog.com/entry/2023/04/28/014737