【Kafka】Kafka Connect ~ 基礎知識編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/04/23/235534 https://dk521123.hatenablog.com/entry/2023/04/26/103421 https://dk521123.hatenablog.com/entry/2023/04/27/235703 https://dk521123.hatenablog.com/entry/2023/04/28/014737

の続き。

今回は、Kafka Connect について、扱う。

目次

【1】Kafka Connect
【2】Kafka Connect の種類
 1)Source
 2)Sink
【3】Kafka Connect の実行モード
 1)スタンドアローンモード
 2)分散モード
【4】Quick Start
 0)実行前の下準備
 1)Source
 補足:設定ファイル
【5】REST API
 1)Source
 2)Sink
 3)Connectorの削除

【1】Kafka Connect

* Kafkaと周辺のシステム間で
 ストリームデータをやりとりするためのツール
 => 英語だけど、Kafka Connect の説明動画あり。

https://developer.confluent.io/learn-kafka/kafka-connect/how-connectors-work/

【2】Kafka Connect の種類

構成図

Producer => [Source] => Kafka => [Sink] => Consumer

1)Source

* Producer (Any to Kafka) 側の Connector 

2)Sink

* Consumer (Kafka to any)側の Connector

【3】Kafka Connect の実行モード

1)スタンドアローンモード
2)分散モード

1)スタンドアローンモード

* 全ての仕事が1つのプロセスの中で実施

起動例

# Linux
bin/connect-standalone.sh \
  config/connect-standalone.properties connector1.properties [connector2.properties ...]

# [解説]
# * connect-standalone.properties ... Workの設定
# * connector1.properties ... Connectorの設定

# Windows
bin\windows\connect-standalone.bat \
 config\connect-standalone.properties config\connect-file-source.properties

2)分散モード

* 複数台のサーバーで分散処理を実施
* 同じ group.id を使用して複数のワーカープロセスを開始

起動例

# Linux
bin/connect-distributed.sh config/connect-distributed.properties

# Windows
bin\windows\connect-distributed.bat config\connect-distributed.properties

【4】Quick Start

0)実行前の下準備

[1] zookeeper起動

# Start the ZooKeeper service
.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties

[2] Kafka broker起動

# Start the Kafka broker service
.\bin\windows\kafka-server-start.bat config\server.properties

[3] トピックの確認

#  Topic「connect-test」確認
bin\windows\kafka-topics.bat --describe --topic connect-test --bootstrap-server localhost:9092

# もし、何も作成していない場合は、Topic「connect-test」作成
# bin\windows\kafka-topics.bat --create --topic connect-test --bootstrap-server localhost:9092

1)Source

[1] 送信するコンテンツ作成

* Kafkaがインストールしている配下に「test.txt」を置き
  送信内容を記述し、保存する
~~~~~~~~~~~~
Hello, world!!!?
~~~~~~~~~~~~

[2] SourceとしてのKafka Connectを起動

bin\windows\connect-standalone.bat config\connect-standalone.properties config\connect-file-source.properties

[3] 確認

# test.txt の内容が表示されるか確認
bin\windows\kafka-console-consumer.bat --topic connect-test --from-beginning --bootstrap-server localhost:9092

補足:設定ファイル

config\connect-standalone.properties

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

config\connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

config\connect-file-sink.properties

name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=connect-test

【5】REST API

* curlコマンドを使って、REST APIを呼び出し、Source/Sinkをしてみる
 => いつの間にか、Windowsでも使える
 => 「curl --version」で使えるか確認

curl コマンド
https://dk521123.hatenablog.com/entry/2017/12/05/233100

1)Source

[1] Sourceのデータ「sample-source.txt」を作成する

Hello
World!!
Bye

[2] REST APIに送るSource用データ「sample-source.json」を作成する(「connector.class」で「FileStreamSourceConnector」を指定)

{
  "name" : "sample-source-data",
  "config" : {
    "connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "file" : "sample-source.txt",
    "tasks.max" : "1",
    "topic" : "connect-test"
  }
}

[3] REST APIに送る

curl -X POST --header "Content-Type: application/json" http://localhost:8083/connectors -d @sample-source.json

# 「curl http://localhost:8083/connectors/【ConnectionName】/status」で確認可能
curl http://localhost:8083/connectors/sample-source-data/status

補足:コネクターのステータスについて
https://docs.confluent.io/ja-jp/platform/7.1/connect/monitoring.html#connector-and-task-status

Status Explanations
"state":"UNASSIGNED" ネクター/タスクは、まだワーカーに割り当てられていない
"state":"RUNNING" ネクター/タスクが実行中
"state":"PAUSED" ネクター/タスクが一時停止
"state":"FAILED" ネクター/タスクが失敗

2)Sink

[1] REST APIに送るSink用データ「sample-sink.json」を作成する(「connector.class」で「FileStreamSinkConnector」を指定)

{
  "name" : "sample-sink-data",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", 
    "file" : "sample-sink.txt",
    "tasks.max" : "1",
    "topics": "connect-test"
  }
}

[2] REST APIに送る

curl -X POST --header "Content-Type: application/json" http://localhost:8083/connectors -d @sample-sink.json

# 「curl http://localhost:8083/connectors/【ConnectionName】/status」で確認可能
curl http://localhost:8083/connectors/sample-sink-data/status

[3] 動作確認(「sample-sink.txt」の内容が「sample-source.txt」と同じことを確認)

Hello
World!!
Bye

3)Connectorの削除

[1] Connectorの確認

curl http://localhost:8083/connectors

<出力結果>
["sample-source-data","sample-sink-data"]

[2] Connectorの削除

curl -X DELETE http://localhost:8083/connectors/sample-source-data
curl -X DELETE http://localhost:8083/connectors/sample-sink-data

[3] Connectorの再確認

curl http://localhost:8083/connectors

<出力結果>
[]

参考文献

http://mogile.web.fc2.com/kafka/documentation_8.html
https://tutuz-tech.hatenablog.com/entry/2019/03/19/093259
https://tutuz-tech.hatenablog.com/entry/2019/03/21/000835
https://qiita.com/knoguchi/items/d0b8ed1297829dc58346
https://tjtjtj.hatenablog.com/entry/2019/11/20/230000

関連記事

Kafka Connect ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/10/22/220717
Kafka Connect ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/08/11/220921
Kafka Connect ~ 基本編 / PostgreSQL
https://dk521123.hatenablog.com/entry/2023/05/02/233806
Kafka Connect ~ Connector 構成プロパティ ~
https://dk521123.hatenablog.com/entry/2023/06/02/011131
Kafka Connect ~ Rest API
https://dk521123.hatenablog.com/entry/2023/05/31/000000
Kafka Connect での デバッグ方法
https://dk521123.hatenablog.com/entry/2023/08/15/215636
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Apache Kafka ~ Remote debug ~
https://dk521123.hatenablog.com/entry/2023/10/23/125909
kafka-python ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/10/24/000309
Amazon MSK ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/04/21/000000
Amazon MSK ~ 基本編 / Connector ~
https://dk521123.hatenablog.com/entry/2023/05/20/003516
Kafkaコネクタ ~ Kafka用Snowflakeコネクタ ~
https://dk521123.hatenablog.com/entry/2023/06/07/144114
Confluent ~ Local環境構築 ~
https://dk521123.hatenablog.com/entry/2024/05/10/000325
Confluent ~ REST API
https://dk521123.hatenablog.com/entry/2024/05/09/144826
ScalaApache Kafka / Producer ~
https://dk521123.hatenablog.com/entry/2023/04/27/235703
ScalaApache Kafka / Consumer ~
https://dk521123.hatenablog.com/entry/2023/04/28/014737
curl コマンド
https://dk521123.hatenablog.com/entry/2017/12/05/233100