【Flink】Apache Flink ~ DataStream API ~

■ はじめに

Flinkによる Lake Formationとの接続を調べる際に
DataStream API が候補に挙がったので調べてみた

目次

【1】DataStream API
 1)公式ドキュメント
 2)一般サイト
【2】準備 - 依存ライブラリ -
 1)sbt
【3】サンプル
 例1:Hello World

【1】DataStream API

* Flink DataStream APIのコアはストリーミングデータを表す

https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html

メモ

ただ、よくよく調べてみると、
別に今までの扱ってきた関連記事(例えば、以下のURL)
でもやってきたこと、、、 

https://dk521123.hatenablog.com/entry/2023/07/23/161621

1)公式ドキュメント

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/
古いけど日本語
http://mogile.web.fc2.com/flink/flink-docs-release-1.3/dev/datastream_api.html

2)一般サイト

https://qiita.com/KentOhwada_AlibabaCloudJapan/items/5a50637354bd4597913e

【2】準備 - 依存ライブラリ -

1)sbt

val flinkVersion = "1.16.1"

val flinkDependencies = Seq(
  // ...
  // Add to
  "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided"
)

【3】サンプル

例1:Hello World

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Demo {
  def main(args: Array[String]): Unit = {
    // create environments of both APIs
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    // create a DataStream
    val dataStream = env.fromElements("Alice", "Bob", "John")

    // interpret the insert-only DataStream as a Table
    val inputTable = tableEnv.fromDataStream(dataStream)

    // register the Table object as a view and query it
    tableEnv.createTemporaryView("InputTable", inputTable)
    val resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable")

    // interpret the insert-only Table as a DataStream again
    val resultStream = tableEnv.toDataStream(resultTable)

    // add a printing sink and execute in DataStream API
    resultStream.print()
    env.execute()
// prints:
// +I[Alice]
// +I[Bob]
// +I[John]
  }
}

関連記事

Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/29/000000
Apache Flink ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/07/23/161621
Apache Flink ~ Table API & SQL
https://dk521123.hatenablog.com/entry/2023/10/04/001517
LakeFormation ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2020/10/13/094041