■ はじめに
https://dk521123.hatenablog.com/entry/2023/04/23/235534 https://dk521123.hatenablog.com/entry/2023/04/26/103421 https://dk521123.hatenablog.com/entry/2023/04/27/235703 https://dk521123.hatenablog.com/entry/2023/04/28/014737
の続き。 今回は、Kafka Connect について、扱う。
目次
【1】Kafka Connect 【2】Kafka Connect の種類 1)Source 2)Sink 【3】Kafka Connect の実行モード 1)スタンドアローンモード 2)分散モード 【4】Quick Start 0)実行前の下準備 1)Source 補足:設定ファイル 【5】REST API 1)Source 2)Sink 3)Connectorの削除
【1】Kafka Connect
* Kafkaと周辺のシステム間で ストリームデータをやりとりするためのツール => 英語だけど、Kafka Connect の説明動画あり。
https://developer.confluent.io/learn-kafka/kafka-connect/how-connectors-work/
【2】Kafka Connect の種類
構成図
Producer => [Source] => Kafka => [Sink] => Consumer
1)Source
* Producer (Any to Kafka) 側の Connector
2)Sink
* Consumer (Kafka to any)側の Connector
【3】Kafka Connect の実行モード
1)スタンドアローンモード 2)分散モード
1)スタンドアローンモード
* 全ての仕事が1つのプロセスの中で実施
起動例
# Linux bin/connect-standalone.sh \ config/connect-standalone.properties connector1.properties [connector2.properties ...] # [解説] # * connect-standalone.properties ... Workの設定 # * connector1.properties ... Connectorの設定 # Windows bin\windows\connect-standalone.bat \ config\connect-standalone.properties config\connect-file-source.properties
2)分散モード
* 複数台のサーバーで分散処理を実施 * 同じ group.id を使用して複数のワーカープロセスを開始
起動例
# Linux bin/connect-distributed.sh config/connect-distributed.properties # Windows bin\windows\connect-distributed.bat config\connect-distributed.properties
【4】Quick Start
0)実行前の下準備
[1] zookeeper起動
# Start the ZooKeeper service .\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
[2] Kafka broker起動
# Start the Kafka broker service .\bin\windows\kafka-server-start.bat config\server.properties
[3] トピックの確認
# Topic「connect-test」確認 bin\windows\kafka-topics.bat --describe --topic connect-test --bootstrap-server localhost:9092 # もし、何も作成していない場合は、Topic「connect-test」作成 # bin\windows\kafka-topics.bat --create --topic connect-test --bootstrap-server localhost:9092
1)Source
[1] 送信するコンテンツ作成
* Kafkaがインストールしている配下に「test.txt」を置き 送信内容を記述し、保存する ~~~~~~~~~~~~ Hello, world!!!? ~~~~~~~~~~~~
[2] SourceとしてのKafka Connectを起動
bin\windows\connect-standalone.bat config\connect-standalone.properties config\connect-file-source.properties
[3] 確認
# test.txt の内容が表示されるか確認 bin\windows\kafka-console-consumer.bat --topic connect-test --from-beginning --bootstrap-server localhost:9092
補足:設定ファイル
config\connect-standalone.properties
# These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=localhost:9092 # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply # it to key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000
config\connect-file-source.properties
name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test
config\connect-file-sink.properties
name=local-console-sink connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector tasks.max=1 topics=connect-test
【5】REST API
* curlコマンドを使って、REST APIを呼び出し、Source/Sinkをしてみる => いつの間にか、Windowsでも使える => 「curl --version」で使えるか確認
curl コマンド
https://dk521123.hatenablog.com/entry/2017/12/05/233100
1)Source
[1] Sourceのデータ「sample-source.txt」を作成する
Hello World!! Bye
[2] REST APIに送るSource用データ「sample-source.json」を作成する(「connector.class」で「FileStreamSourceConnector」を指定)
{ "name" : "sample-source-data", "config" : { "connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector", "file" : "sample-source.txt", "tasks.max" : "1", "topic" : "connect-test" } }
[3] REST APIに送る
curl -X POST --header "Content-Type: application/json" http://localhost:8083/connectors -d @sample-source.json # 「curl http://localhost:8083/connectors/【ConnectionName】/status」で確認可能 curl http://localhost:8083/connectors/sample-source-data/status
補足:コネクターのステータスについて
https://docs.confluent.io/ja-jp/platform/7.1/connect/monitoring.html#connector-and-task-status
Status | Explanations |
---|---|
"state":"UNASSIGNED" | コネクター/タスクは、まだワーカーに割り当てられていない |
"state":"RUNNING" | コネクター/タスクが実行中 |
"state":"PAUSED" | コネクター/タスクが一時停止 |
"state":"FAILED" | コネクター/タスクが失敗 |
2)Sink
[1] REST APIに送るSink用データ「sample-sink.json」を作成する(「connector.class」で「FileStreamSinkConnector」を指定)
{ "name" : "sample-sink-data", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "file" : "sample-sink.txt", "tasks.max" : "1", "topics": "connect-test" } }
[2] REST APIに送る
curl -X POST --header "Content-Type: application/json" http://localhost:8083/connectors -d @sample-sink.json # 「curl http://localhost:8083/connectors/【ConnectionName】/status」で確認可能 curl http://localhost:8083/connectors/sample-sink-data/status
[3] 動作確認(「sample-sink.txt」の内容が「sample-source.txt」と同じことを確認)
Hello World!! Bye
3)Connectorの削除
[1] Connectorの確認
curl http://localhost:8083/connectors <出力結果> ["sample-source-data","sample-sink-data"]
[2] Connectorの削除
curl -X DELETE http://localhost:8083/connectors/sample-source-data curl -X DELETE http://localhost:8083/connectors/sample-sink-data
[3] Connectorの再確認
curl http://localhost:8083/connectors <出力結果> []
参考文献
http://mogile.web.fc2.com/kafka/documentation_8.html
https://tutuz-tech.hatenablog.com/entry/2019/03/19/093259
https://tutuz-tech.hatenablog.com/entry/2019/03/21/000835
https://qiita.com/knoguchi/items/d0b8ed1297829dc58346
https://tjtjtj.hatenablog.com/entry/2019/11/20/230000
関連記事
Kafka Connect ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/10/22/220717
Kafka Connect ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/08/11/220921
Kafka Connect ~ 基本編 / PostgreSQL ~
https://dk521123.hatenablog.com/entry/2023/05/02/233806
Kafka Connect ~ Connector 構成プロパティ ~
https://dk521123.hatenablog.com/entry/2023/06/02/011131
Kafka Connect ~ Rest API ~
https://dk521123.hatenablog.com/entry/2023/05/31/000000
Kafka Connect での デバッグ方法
https://dk521123.hatenablog.com/entry/2023/08/15/215636
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Apache Kafka ~ Remote debug ~
https://dk521123.hatenablog.com/entry/2023/10/23/125909
kafka-python ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/10/24/000309
Amazon MSK ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/04/21/000000
Amazon MSK ~ 基本編 / Connector ~
https://dk521123.hatenablog.com/entry/2023/05/20/003516
Kafkaコネクタ ~ Kafka用Snowflakeコネクタ ~
https://dk521123.hatenablog.com/entry/2023/06/07/144114
Confluent ~ Local環境構築 ~
https://dk521123.hatenablog.com/entry/2024/05/10/000325
Confluent ~ REST API ~
https://dk521123.hatenablog.com/entry/2024/05/09/144826
Scala ~ Apache Kafka / Producer ~
https://dk521123.hatenablog.com/entry/2023/04/27/235703
Scala ~ Apache Kafka / Consumer ~
https://dk521123.hatenablog.com/entry/2023/04/28/014737
curl コマンド
https://dk521123.hatenablog.com/entry/2017/12/05/233100