■ はじめに
https://dk521123.hatenablog.com/entry/2023/04/23/235534
https://dk521123.hatenablog.com/entry/2023/04/24/153846
https://dk521123.hatenablog.com/entry/2023/04/26/103421
で、Apache Kafkaについて、取り上げた。 今回は、Apache Kafkaへの送信部分(Producer)を Scala で実装してみる
目次
【1】サンプル 1)ファイル構成 2)build.sbt 3)main.scala 【2】動作確認 1)前提条件 2)プログラム実行までの下準備 3)作成したプログラム実行 4)メッセージの確認
【1】サンプル
1)ファイル構成
└─hellokafka ├─build.sbt ... 後述「2)build.sbt」参照 └─src └─main └─scala └─ main.scala ... 後述「3)main.scala」参照
2)build.sbt
ThisBuild / organization := "com.hello" ThisBuild / version := "1.0" ThisBuild / scalaVersion := "2.13.10" libraryDependencies ++= Seq( "org.apache.kafka" % "kafka-clients" % "3.4.0", "org.scalatest" %% "scalatest" % "3.2.15" % "test" )
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
3)main.scala
import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} object SimpleProducer { def main(args: Array[String]): Unit = { println("Start") val topicName = "hello-world-topic" println(s"connecting to ${topicName}") val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("acks", "all") val producer = new KafkaProducer[String, String](props) val data = new ProducerRecord[String, String]( topicName, "key_first", "This is my first message...") producer.send(data) producer.close() println("Done...") } }
【2】動作確認
1)前提条件
* Scala および Apache Kafka の環境が ローカルPCにセットアップされていること => 設定されていない場合、以下の関連記事を参考に設定すること => 以降は、Windowsで設定されたことを想定して記述していく
Scala ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/10/193805
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
2)プログラム実行までの下準備
[1] zookeeper起動
# Start the ZooKeeper service .\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
[2] Kafka broker起動
# Start the Kafka broker service .\bin\windows\kafka-server-start.bat config\server.properties
[3] トピックを作成 & 確認
# Topic「hello-world-topic」作成 bin\windows\kafka-topics.bat --create --topic hello-world-topic --bootstrap-server localhost:9092 # 確認 bin\windows\kafka-topics.bat --describe --topic hello-world-topic --bootstrap-server localhost:9092
3)作成したプログラム実行
sbt run ... [info] running SimpleProducer Start << プログラムでデバッグプリントしたところ connecting to hello-world-topic << プログラムでデバッグプリントしたところ SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Done... << プログラムでデバッグプリントしたところ
4)メッセージの確認
bin\windows\kafka-console-consumer.bat --topic hello-world-topic --from-beginning --bootstrap-server localhost:9092 This is my first message... << ★メッセージが届いている Processed a total of 1 messages
関連記事
Scala ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/10/193805
Scala ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/03/12/184331
Scala ~ Apache Kafka / Consumer ~
https://dk521123.hatenablog.com/entry/2023/04/28/014737
Spark/Scalaの開発環境構築 ~ Windows編 ~
https://dk521123.hatenablog.com/entry/2023/03/20/115450
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ Kafka Connect ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Apache Kafka ~ Kafka Connect / PostgreSQL ~
https://dk521123.hatenablog.com/entry/2023/05/02/233806