■ はじめに
https://dk521123.hatenablog.com/entry/2023/03/01/235100
https://dk521123.hatenablog.com/entry/2023/05/29/000000
の続き。 今回は、Apache Flink の ベースとなるAPIやサンプルをまとめる。
目次
【1】Flink ソース外観 1)外観 【2】主なAPI 1)Source 2)Operator 3)Sink 【3】サンプル 例1:シンプルな入出力 例2:ファイルによる入出力
【1】Flink ソース外観
* 全部で 5 Step
1)外観
import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala._ // Step1: Set up the execution environment (初めの決まり文句) val env = ExecutionEnvironment.getExecutionEnvironment // Step2: Read Source(入力) val text = env.socketTextStream("localhost", "9024") // Step3: Operator (処理) val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .keyBy(0) .sum(1) // Step4: Sink / Output as text file (出力) counts.writeAsText("output") // Step5: Execute program (最後の決まり文句) env.execute("Done")
【2】主なAPI
1)Source
http://mogile.web.fc2.com/flink/flink-docs-release-1.5/dev/batch/#data-sources
# | API | Explanations | Example |
---|---|---|---|
1 | fromElements | 指定されたオブジェクトの系列からデータセットを生成 | val text = env.fromElements("...") |
2 | readTextFile | ファイルを行ごとに読み込み、文字列として返す | val text = env.readTextFile("input") |
3 | readCsvFile | カンマ(あるいは他の文字)でフィールドが区切られたファイルをパース | val text = env.readCsvFile("input") |
4 | addSource | 新しいソース関数を付け加えます。例えば、Apache Kafkaから読み込むために、addSource(new FlinkKafkaConsumer<>(...))を使うことができる | - |
2)Operator
* 紹介していないものについては、以下を参照。
http://mogile.web.fc2.com/flink/flink-docs-release-1.5/dev/batch/#dataset-transformations
# | API | Explanations | Example |
---|---|---|---|
1 | Map | 一つの要素を取り、一つの要素を生成 | data.map { x => x.toInt } |
2 | FlatMap | 一つの要素を取り、一つの要素を生成 | data.flatMap { str => str.split(" ") } |
3 | MapPartition | パーティション単位で分散処理の定義 | data.mapPartition { in => in map { (_, 1) } } |
4 | filter | 条件がtrueになるものだけ返す | data.filter { _ > 1000 } |
3)Sink
http://mogile.web.fc2.com/flink/flink-docs-release-1.5/dev/batch/#data-sinks
# | API | Explanations | Example |
---|---|---|---|
1 | 標準出力 | counts.print() | |
2 | printToErr | 標準エラー出力 | counts.printToErr() |
3 | writeAsText | 標準エラー出力 | counts.writeAsText("output") |
4 | writeAsCsv | カンマ区切りの値のファイルとしてタプルで出力 | counts.writeAsCsv("output2", "\n", ",") |
5 | writeToSocket | ソケットとして出力 | counts..writeToSocket(socketHost, portSink, new SimpleStringSchema()) |
6 | addSink | 独自のシンク関数を起動。Flinkはシンク関数として実装された(Apache Kafkaのような)他のシステムへのコネクタと一緒にまとめられる | - |
【3】サンプル
例1:シンプルな入出力
package dk.com import org.apache.flink.api.scala._ object WordCount { def main(args: Array[String]): Unit = { // set up the execution environment val env = ExecutionEnvironment.getExecutionEnvironment // get input data (fromElements で読み込む) val text = env.fromElements("To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,") val counts = text.flatMap { _.toLowerCase.split("\\W+") } .map { (_, 1) } .groupBy(0) .sum(1) // execute and print result counts.print() } }
input/word.txt
hello world hello flink hello scala
例2:ファイルによる入出力
package dk.com import org.slf4j.Logger import org.slf4j.LoggerFactory import org.apache.flink.api.scala._ import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector object HelloWorld { val LOG = LoggerFactory.getLogger("HelloWorld") def main(args: Array[String]): Unit = { LOG.info("Start!") val env = ExecutionEnvironment.getExecutionEnvironment val text = env.readTextFile("input") val counts = text .flatMap(value => value.split("\\s+")) .map(value => (value,1)) .groupBy(0) .sum(1) counts.writeAsText("output") counts.writeAsCsv("output2", "\n", ",") env.execute("temp") LOG.info("Done...") } }
関連記事
Apache Flink ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/03/05/235755
Apache Flink ~ 環境構築編 / Kubernetes ~
https://dk521123.hatenablog.com/entry/2023/05/09/235256
Apache Flink ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/07/28/220039
Apache Flink ~ Flink to PostgreSQL ~
https://dk521123.hatenablog.com/entry/2023/08/12/133352
Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/29/000000
Apache Flink ~ 引数の扱い / ParameterTool ~
https://dk521123.hatenablog.com/entry/2023/08/01/004207
Apache Flink ~ Streaming vs Batch ~
https://dk521123.hatenablog.com/entry/2023/07/02/142843
Apache Flink ~ DataStream API ~
https://dk521123.hatenablog.com/entry/2023/10/03/212007
Apache Flink ~ Table API & SQL ~
https://dk521123.hatenablog.com/entry/2023/10/04/001517
Apache Flink ~ RichParallelSourceFunction ~
https://dk521123.hatenablog.com/entry/2023/09/15/000000