【Flink】Apache Flink ~ 入門編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/03/05/235755
https://dk521123.hatenablog.com/entry/2023/05/09/235256
https://dk521123.hatenablog.com/entry/2023/03/01/235100

の続き。

Apache Flink の Hello world をやってみる。

目次

【1】Apache Flink用のプロジェクトの生成
 1)前提条件
【2】実行してみる
 1)Job.scala
 2)SocketTextStreamWordCount.scala
 3)WordCount.scala
【3】Hello world
 1)Job.scalaを修正
【4】Next Step

【1】Apache Flink用のプロジェクトの生成

* 以下の和訳サイトを参考にするのが良さそう

http://mogile.web.fc2.com/flink/flink-docs-release-1.5/quickstart/scala_api_quickstart.html#build-tools

1)前提条件

[1] JDK8/11がインストールされていること
[2] sbt がインストールされていること

=> インストールされていない場合、以下の関連記事を参照のこと

Scala ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/10/193805

sbtテンプレートを使う方法

$ sbt new tillrohrmann/flink-project.g8

...
Flink Application Project Using sbt 

name [Flink Project]: hello-world
organization [org.example]: sample.com
version [0.1-SNAPSHOT]: 
scala_version [2.11.12]: 
flink_version [1.14.6]: 

# bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
# ってのもあるみたい。。。

【2】実行してみる

* sbt run で実行する
 => 「3)WordCount.scala」なら簡単に実行できる。

1)Job.scala

package org.example

import org.apache.flink.api.scala._

object Job {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    // execute program
    env.execute("Flink Scala API Skeleton")
  }
}

2)SocketTextStreamWordCount.scala

package org.example

import org.apache.flink.streaming.api.scala._

object SocketTextStreamWordCount {

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
      return
    }

    val hostName = args(0)
    val port = args(1).toInt

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //Create streams for names and ages by mapping the inputs to the corresponding objects
    val text = env.socketTextStream(hostName, port)
    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .sum(1)

    counts print

    env.execute("Scala SocketTextStreamWordCount Example")
  }
}

3)WordCount.scala

// 単語カウント
package org.example

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]): Unit = {

    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    // get input data
    val text = env.fromElements("This is a sample program,--that is the question:--",
      "Can you help me?", "This is a flink demo...",
      "Can u tell me IT teches?")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    // execute and print result
    counts.print()
  }
}

出力結果

[info] (is,3)
[info] (u,1)
[info] (a,2)
[info] (you,1)
[info] (flink,1)
[info] (teches,1)
[info] (tell,1)
[info] (the,1)
[info] (demo,1)
[info] (this,2)
[info] (can,2)
[info] (me,2)
[info] (program,1)
[info] (sample,1)
[info] (help,1)
[info] (it,1)
[info] (question,1)
[info] (that,1)

【3】Hello world

1)Job.scalaを修正

// ファイルのRead/Write
package org.example

import org.apache.flink.api.scala._

object Job {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    val lines = env.readTextFile("/mnt/c/tmp/hello.txt")
    val counts = lines.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.writeAsCsv("/mnt/c/tmp/out.txt", "\n", " ")

    // execute program
    env.execute("Flink Scala API Skeleton")
  }
}

【4】Next Step

* 実行環境となんとなくで触れたので、
 次は、色々なサンプルに触れて、手を増やしていく
 => 以下が参考になりそう

https://mogile.web.fc2.com/flink/flink-docs-release-1.4/dev/batch/index.html
http://mogile.web.fc2.com/flink/flink-docs-release-1.3/dev/batch/examples.html
https://naiveskill.com/flink-wordcount-scala/
非同期
http://mogile.web.fc2.com/flink/flink-docs-release-1.5/dev/stream/operators/asyncio.html

関連記事

Apache Flink ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/05/235755
Apache Flink ~ 環境構築編 / Kubernetes
https://dk521123.hatenablog.com/entry/2023/05/09/235256
Apache Flink ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/07/28/220039
Apache Flink ~ Flink to PostgreSQL
https://dk521123.hatenablog.com/entry/2023/08/12/133352
Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/07/23/161621
Apache Flink ~ 引数の扱い / ParameterTool ~
https://dk521123.hatenablog.com/entry/2023/08/01/004207
Apache Flink ~ Streaming vs Batch ~
https://dk521123.hatenablog.com/entry/2023/07/02/142843
Apache Flink ~ DataStream API
https://dk521123.hatenablog.com/entry/2023/10/03/212007
Apache Flink ~ Table API & SQL
https://dk521123.hatenablog.com/entry/2023/10/04/001517
Apache Flink ~ RichParallelSourceFunction ~
https://dk521123.hatenablog.com/entry/2023/09/15/000000
Scala ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/10/193805
Scala ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/03/12/184331