【Scala】Scala ~ Apache Kafka / Consumer ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/04/27/235703

の続き。

前回は、Apache Kafkaへの送信部分(Producer)を実装したので
今回は、Apache Kafkaへの受信部分(Consumer)を
Scala で実装してみる

目次

【1】サンプル
 補足: build.sbt
【2】動作確認
 1)前提条件
 2)作成したプログラム実行
 3)メッセージ送信および確認

【1】サンプル

import java.util.{Collections, Properties}
import java.util.regex.Pattern
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._

object SimpleConsumer {
  def main(args: Array[String]): Unit = {
    println("Start - Consumer")

    val props = new Properties()
    props.put("group.id", "test")
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("value.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    val consumer = new KafkaConsumer(props)
    val topics = List("hello-world-topic")
    consumer.subscribe(topics.asJava)
    while (true) {
      val records = consumer.poll(1000).asScala
      for (record <- records) {
        println(s"key: ${record.key()}, value: ${record.value()}")
        println(s"partition: ${record.partition()}, offset: ${record.offset()}")
      }
    }
  }
}

補足: build.sbt

* build.sbt については、前回の記事(以下のURL参照)と同じなので省略

https://dk521123.hatenablog.com/entry/2023/04/27/235703

【2】動作確認

1)前提条件

* 前回の記事を状態(zookeeper・Kafka broker起動、Topic「hello-world-topic」作成済等)
 と同じ状態になっていること

https://dk521123.hatenablog.com/entry/2023/04/27/235703

2)作成したプログラム実行

sbt run

...
[info] running SimpleConsumer
Start - Consumer
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
ここから無限ループで表示が止まる(この状態を一旦、キープ)

3)メッセージ送信および確認

[1] Producer のプログラムの一部変更
https://dk521123.hatenablog.com/entry/2023/04/27/235703

    val data = new ProducerRecord[String, String](
      topicName, "key_second", "This is my 2nd message...")

[2] Producer のプログラムを実行

sbt run

...
[info] running SimpleProducer
Start
connecting to hello-world-topic
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.   
Done...

[3] Consumer にメッセージが届いているか確認

key: key_second, value: This is my 2nd message... << 「[2] Producer のプログラムを実行」のメッセージを確認
partition: 0, offset: 1

関連記事

Scala ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/10/193805
Scala ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/03/12/184331
Spark/Scalaの開発環境構築 ~ Windows編 ~
https://dk521123.hatenablog.com/entry/2023/03/20/115450
ScalaApache Kafka / Producer ~
https://dk521123.hatenablog.com/entry/2023/04/27/235703
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ Kafka Connect ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Apache Kafka ~ Kafka Connect / PostgreSQL
https://dk521123.hatenablog.com/entry/2023/05/02/233806