【Scala】Scala ~ Apache Kafka / Producer ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/04/23/235534
https://dk521123.hatenablog.com/entry/2023/04/24/153846
https://dk521123.hatenablog.com/entry/2023/04/26/103421

で、Apache Kafkaについて、取り上げた。

今回は、Apache Kafkaへの送信部分(Producer)を
Scala で実装してみる

目次

【1】サンプル
 1)ファイル構成
 2)build.sbt
 3)main.scala
【2】動作確認
 1)前提条件
 2)プログラム実行までの下準備
 3)作成したプログラム実行
 4)メッセージの確認

【1】サンプル

1)ファイル構成

└─hellokafka
    ├─build.sbt ... 後述「2)build.sbt」参照
    └─src
        └─main
            └─scala
                 └─ main.scala ... 後述「3)main.scala」参照

2)build.sbt

ThisBuild / organization := "com.hello"
ThisBuild / version := "1.0"
ThisBuild / scalaVersion := "2.13.10"

libraryDependencies ++= Seq(
  "org.apache.kafka" % "kafka-clients" % "3.4.0",
  "org.scalatest" %% "scalatest" % "3.2.15" % "test"
)

https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients

3)main.scala

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object SimpleProducer {
  def main(args: Array[String]): Unit = {
    println("Start")

    val topicName = "hello-world-topic"
    println(s"connecting to ${topicName}")

    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer",
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer",
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put("acks", "all")

    val producer = new KafkaProducer[String, String](props)

    val data = new ProducerRecord[String, String](
      topicName, "key_first", "This is my first message...") 
    producer.send(data)
    producer.close()
    println("Done...")
  }
}

【2】動作確認

1)前提条件

* Scala および Apache Kafka の環境が
 ローカルPCにセットアップされていること
 => 設定されていない場合、以下の関連記事を参考に設定すること
 => 以降は、Windowsで設定されたことを想定して記述していく

Scala ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/10/193805
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534

2)プログラム実行までの下準備

[1] zookeeper起動

# Start the ZooKeeper service
.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties

[2] Kafka broker起動

# Start the Kafka broker service
.\bin\windows\kafka-server-start.bat config\server.properties

[3] トピックを作成 & 確認

# Topic「hello-world-topic」作成
bin\windows\kafka-topics.bat --create --topic hello-world-topic --bootstrap-server localhost:9092

# 確認
bin\windows\kafka-topics.bat --describe --topic hello-world-topic --bootstrap-server localhost:9092

3)作成したプログラム実行

sbt run

...
[info] running SimpleProducer
Start  << プログラムでデバッグプリントしたところ
connecting to hello-world-topic << プログラムでデバッグプリントしたところ
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.   
Done... << プログラムでデバッグプリントしたところ

4)メッセージの確認

bin\windows\kafka-console-consumer.bat --topic hello-world-topic --from-beginning --bootstrap-server localhost:9092

This is my first message... << ★メッセージが届いている
Processed a total of 1 messages

関連記事

Scala ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/10/193805
Scala ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/03/12/184331
ScalaApache Kafka / Consumer ~
https://dk521123.hatenablog.com/entry/2023/04/28/014737
Spark/Scalaの開発環境構築 ~ Windows編 ~
https://dk521123.hatenablog.com/entry/2023/03/20/115450
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ Kafka Connect ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Apache Kafka ~ Kafka Connect / PostgreSQL
https://dk521123.hatenablog.com/entry/2023/05/02/233806