【Flink】Apache Flink ~ 基本編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/03/01/235100
https://dk521123.hatenablog.com/entry/2023/05/29/000000

の続き。

今回は、Apache Flink の ベースとなるAPIやサンプルをまとめる。

目次

【1】Flink ソース外観
 1)外観
【2】主なAPI
 1)Source
 2)Operator
 3)Sink 
【3】サンプル
 例1:シンプルな入出力
 例2:ファイルによる入出力

【1】Flink ソース外観

* 全部で 5 Step

1)外観

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala._

// Step1: Set up the execution environment (初めの決まり文句)
val env = ExecutionEnvironment.getExecutionEnvironment

// Step2: Read Source(入力)
val text = env.socketTextStream("localhost", "9024")

// Step3: Operator (処理)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  .map { (_, 1) }
  .keyBy(0)
  .sum(1)

// Step4: Sink / Output as text file (出力)
counts.writeAsText("output")

// Step5:  Execute program (最後の決まり文句)
env.execute("Done")

【2】主なAPI

1)Source

http://mogile.web.fc2.com/flink/flink-docs-release-1.5/dev/batch/#data-sources

# API Explanations Example
1 fromElements 指定されたオブジェクトの系列からデータセットを生成 val text = env.fromElements("...")
2 readTextFile ファイルを行ごとに読み込み、文字列として返す val text = env.readTextFile("input")
3 readCsvFile カンマ(あるいは他の文字)でフィールドが区切られたファイルをパース val text = env.readCsvFile("input")
4 addSource 新しいソース関数を付け加えます。例えば、Apache Kafkaから読み込むために、addSource(new FlinkKafkaConsumer<>(...))を使うことができる -

2)Operator

* 紹介していないものについては、以下を参照。

http://mogile.web.fc2.com/flink/flink-docs-release-1.5/dev/batch/#dataset-transformations

# API Explanations Example
1 Map 一つの要素を取り、一つの要素を生成 data.map { x => x.toInt }
2 FlatMap 一つの要素を取り、一つの要素を生成 data.flatMap { str => str.split(" ") }
3 MapPartition パーティション単位で分散処理の定義 data.mapPartition { in => in map { (_, 1) } }
4 filter 条件がtrueになるものだけ返す data.filter { _ > 1000 }

3)Sink

http://mogile.web.fc2.com/flink/flink-docs-release-1.5/dev/batch/#data-sinks

# API Explanations Example
1 print 標準出力 counts.print()
2 printToErr 標準エラー出力 counts.printToErr()
3 writeAsText 標準エラー出力 counts.writeAsText("output")
4 writeAsCsv カンマ区切りの値のファイルとしてタプルで出力 counts.writeAsCsv("output2", "\n", ",")
5 writeToSocket ソケットとして出力 counts..writeToSocket(socketHost, portSink, new SimpleStringSchema())
6 addSink 独自のシンク関数を起動。Flinkはシンク関数として実装された(Apache Kafkaのような)他のシステムへのコネクタと一緒にまとめられる -

【3】サンプル

例1:シンプルな入出力

package dk.com

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]): Unit = {

    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    // get input data (fromElements で読み込む)
    val text = env.fromElements("To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    // execute and print result
    counts.print()
  }
}

input/word.txt

hello world
hello flink
hello scala

例2:ファイルによる入出力

package dk.com

import org.slf4j.Logger
import org.slf4j.LoggerFactory

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector

object HelloWorld {
  val LOG = LoggerFactory.getLogger("HelloWorld")

  def main(args: Array[String]): Unit = {
    LOG.info("Start!")

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.readTextFile("input")
    val counts = text
      .flatMap(value => value.split("\\s+"))
      .map(value => (value,1))
      .groupBy(0)
      .sum(1)

    counts.writeAsText("output")
    counts.writeAsCsv("output2", "\n", ",")

    env.execute("temp")
    LOG.info("Done...")
  }
}

関連記事

Apache Flink ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/05/235755
Apache Flink ~ 環境構築編 / Kubernetes
https://dk521123.hatenablog.com/entry/2023/05/09/235256
Apache Flink ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/07/28/220039
Apache Flink ~ Flink to PostgreSQL
https://dk521123.hatenablog.com/entry/2023/08/12/133352
Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/29/000000
Apache Flink ~ 引数の扱い / ParameterTool ~
https://dk521123.hatenablog.com/entry/2023/08/01/004207
Apache Flink ~ Streaming vs Batch ~
https://dk521123.hatenablog.com/entry/2023/07/02/142843
Apache Flink ~ DataStream API
https://dk521123.hatenablog.com/entry/2023/10/03/212007
Apache Flink ~ Table API & SQL
https://dk521123.hatenablog.com/entry/2023/10/04/001517
Apache Flink ~ RichParallelSourceFunction ~
https://dk521123.hatenablog.com/entry/2023/09/15/000000