■ はじめに
https://dk521123.hatenablog.com/entry/2023/04/27/235703
の続き。 前回は、Apache Kafkaへの送信部分(Producer)を実装したので 今回は、Apache Kafkaへの受信部分(Consumer)を Scala で実装してみる
目次
【1】サンプル 補足: build.sbt 【2】動作確認 1)前提条件 2)作成したプログラム実行 3)メッセージ送信および確認
【1】サンプル
import java.util.{Collections, Properties} import java.util.regex.Pattern import org.apache.kafka.clients.consumer.KafkaConsumer import scala.collection.JavaConverters._ object SimpleConsumer { def main(args: Array[String]): Unit = { println("Start - Consumer") val props = new Properties() props.put("group.id", "test") props.put("bootstrap.servers", "localhost:9092") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("enable.auto.commit", "true") props.put("auto.commit.interval.ms", "1000") val consumer = new KafkaConsumer(props) val topics = List("hello-world-topic") consumer.subscribe(topics.asJava) while (true) { val records = consumer.poll(1000).asScala for (record <- records) { println(s"key: ${record.key()}, value: ${record.value()}") println(s"partition: ${record.partition()}, offset: ${record.offset()}") } } } }
補足: build.sbt
* build.sbt については、前回の記事(以下のURL参照)と同じなので省略
https://dk521123.hatenablog.com/entry/2023/04/27/235703
【2】動作確認
1)前提条件
* 前回の記事を状態(zookeeper・Kafka broker起動、Topic「hello-world-topic」作成済等) と同じ状態になっていること
https://dk521123.hatenablog.com/entry/2023/04/27/235703
2)作成したプログラム実行
sbt run ... [info] running SimpleConsumer Start - Consumer SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. ここから無限ループで表示が止まる(この状態を一旦、キープ)
3)メッセージ送信および確認
[1] Producer のプログラムの一部変更
https://dk521123.hatenablog.com/entry/2023/04/27/235703
val data = new ProducerRecord[String, String]( topicName, "key_second", "This is my 2nd message...")
[2] Producer のプログラムを実行
sbt run ... [info] running SimpleProducer Start connecting to hello-world-topic SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Done...
[3] Consumer にメッセージが届いているか確認
key: key_second, value: This is my 2nd message... << 「[2] Producer のプログラムを実行」のメッセージを確認 partition: 0, offset: 1
関連記事
Scala ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/10/193805
Scala ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/03/12/184331
Spark/Scalaの開発環境構築 ~ Windows編 ~
https://dk521123.hatenablog.com/entry/2023/03/20/115450
Scala ~ Apache Kafka / Producer ~
https://dk521123.hatenablog.com/entry/2023/04/27/235703
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ Kafka Connect ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Apache Kafka ~ Kafka Connect / PostgreSQL ~
https://dk521123.hatenablog.com/entry/2023/05/02/233806