■ はじめに
https://dk521123.hatenablog.com/entry/2023/03/05/235755
https://dk521123.hatenablog.com/entry/2023/05/09/235256
https://dk521123.hatenablog.com/entry/2023/03/01/235100
の続き。 Apache Flink の Hello world をやってみる。
目次
【1】Apache Flink用のプロジェクトの生成 1)前提条件 【2】実行してみる 1)Job.scala 2)SocketTextStreamWordCount.scala 3)WordCount.scala 【3】Hello world 1)Job.scalaを修正 【4】Next Step
【1】Apache Flink用のプロジェクトの生成
* 以下の和訳サイトを参考にするのが良さそう
1)前提条件
[1] JDK8/11がインストールされていること [2] sbt がインストールされていること => インストールされていない場合、以下の関連記事を参照のこと
Scala ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/10/193805
sbtテンプレートを使う方法
$ sbt new tillrohrmann/flink-project.g8 ... Flink Application Project Using sbt name [Flink Project]: hello-world organization [org.example]: sample.com version [0.1-SNAPSHOT]: scala_version [2.11.12]: flink_version [1.14.6]: # bash <(curl https://flink.apache.org/q/sbt-quickstart.sh) # ってのもあるみたい。。。
【2】実行してみる
* sbt run で実行する => 「3)WordCount.scala」なら簡単に実行できる。
1)Job.scala
package org.example import org.apache.flink.api.scala._ object Job { def main(args: Array[String]): Unit = { // set up the execution environment val env = ExecutionEnvironment.getExecutionEnvironment // execute program env.execute("Flink Scala API Skeleton") } }
2)SocketTextStreamWordCount.scala
package org.example import org.apache.flink.streaming.api.scala._ object SocketTextStreamWordCount { def main(args: Array[String]): Unit = { if (args.length != 2) { System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>") return } val hostName = args(0) val port = args(1).toInt val env = StreamExecutionEnvironment.getExecutionEnvironment //Create streams for names and ages by mapping the inputs to the corresponding objects val text = env.socketTextStream(hostName, port) val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .keyBy(0) .sum(1) counts print env.execute("Scala SocketTextStreamWordCount Example") } }
3)WordCount.scala
// 単語カウント package org.example import org.apache.flink.api.scala._ object WordCount { def main(args: Array[String]): Unit = { // set up the execution environment val env = ExecutionEnvironment.getExecutionEnvironment // get input data val text = env.fromElements("This is a sample program,--that is the question:--", "Can you help me?", "This is a flink demo...", "Can u tell me IT teches?") val counts = text.flatMap { _.toLowerCase.split("\\W+") } .map { (_, 1) } .groupBy(0) .sum(1) // execute and print result counts.print() } }
出力結果
[info] (is,3) [info] (u,1) [info] (a,2) [info] (you,1) [info] (flink,1) [info] (teches,1) [info] (tell,1) [info] (the,1) [info] (demo,1) [info] (this,2) [info] (can,2) [info] (me,2) [info] (program,1) [info] (sample,1) [info] (help,1) [info] (it,1) [info] (question,1) [info] (that,1)
【3】Hello world
1)Job.scalaを修正
// ファイルのRead/Write package org.example import org.apache.flink.api.scala._ object Job { def main(args: Array[String]): Unit = { // set up the execution environment val env = ExecutionEnvironment.getExecutionEnvironment val lines = env.readTextFile("/mnt/c/tmp/hello.txt") val counts = lines.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .groupBy(0) .sum(1) counts.writeAsCsv("/mnt/c/tmp/out.txt", "\n", " ") // execute program env.execute("Flink Scala API Skeleton") } }
【4】Next Step
* 実行環境となんとなくで触れたので、 次は、色々なサンプルに触れて、手を増やしていく => 以下が参考になりそう
https://mogile.web.fc2.com/flink/flink-docs-release-1.4/dev/batch/index.html
http://mogile.web.fc2.com/flink/flink-docs-release-1.3/dev/batch/examples.html
https://naiveskill.com/flink-wordcount-scala/
非同期
http://mogile.web.fc2.com/flink/flink-docs-release-1.5/dev/stream/operators/asyncio.html
関連記事
Apache Flink ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/05/235755
Apache Flink ~ 環境構築編 / Kubernetes ~
https://dk521123.hatenablog.com/entry/2023/05/09/235256
Apache Flink ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/07/28/220039
Apache Flink ~ Flink to PostgreSQL ~
https://dk521123.hatenablog.com/entry/2023/08/12/133352
Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/07/23/161621
Apache Flink ~ 引数の扱い / ParameterTool ~
https://dk521123.hatenablog.com/entry/2023/08/01/004207
Apache Flink ~ Streaming vs Batch ~
https://dk521123.hatenablog.com/entry/2023/07/02/142843
Apache Flink ~ DataStream API ~
https://dk521123.hatenablog.com/entry/2023/10/03/212007
Apache Flink ~ Table API & SQL ~
https://dk521123.hatenablog.com/entry/2023/10/04/001517
Apache Flink ~ RichParallelSourceFunction ~
https://dk521123.hatenablog.com/entry/2023/09/15/000000
Scala ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/10/193805
Scala ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/03/12/184331