【Flink】Apache Flink ~ Table API & SQL ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/10/03/212007

の続き。

Flinkによる Lake Formationとの接続を
ChatGPT でサンプルコード出してもらったら()

目次

【1】Table API & SQL
 1)Table API
 2)SQL
 3)Catalogs
【2】準備 - 依存ライブラリ -
 1)sbt
【3】サンプル
 例1:Hello world
【4】おまけ:Lake Formationとの接続

【1】Table API & SQL

* PySparkでやったような感じでできる

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/overview/

1)Table API

* データ(Stream/バッチ問わず)をTableみたいに扱うことができるAPI

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#table-api
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/

2)SQL

* SQLライクに処理できる

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#sql
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/overview/

3)Catalogs

* Data Catalogに関するAPI

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/

メモ

「JdbcCatalog」や「HiveCatalog」ってのもあるので、
Lake Formationは、Hive互換があるからこれを使ってもいけそうっと思ったけど、
コンストラクタみたら「String hiveConfDir」が必ず必要そうなので、今回は使えないかも。

https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/catalog/hive/HiveCatalog.html
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java

【2】準備 - 依存ライブラリ -

* ぶっちゃけ、以下の関連記事と同じ

Apache Flink ~ DataStream API
https://dk521123.hatenablog.com/entry/2023/10/03/212007

1)sbt

val flinkVersion = "1.16.1"

val flinkDependencies = Seq(
  // ...
  // Add to
  "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided"
)

【3】サンプル

例1:Hello world

import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row

object Demo {
  def main(args: Array[String]): Unit = {
    // create environments of both APIs
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    // create a DataStream
    val dataStream = env.fromElements(
      Row.of("Alice", Int.box(12)),
      Row.of("Bob", Int.box(10)),
      Row.of("Alice", Int.box(100))
    )(Types.ROW(Types.STRING, Types.INT))

    // interpret the insert-only DataStream as a Table
    val inputTable = tableEnv.fromDataStream(dataStream).as("name", "score")

    // register the Table object as a view and query it
    // the query contains an aggregation that produces updates
    tableEnv.createTemporaryView("InputTable", inputTable)
    val resultTable = tableEnv.sqlQuery("SELECT name, SUM(score) FROM InputTable GROUP BY name")

    // interpret the updating Table as a changelog DataStream
    val resultStream = tableEnv.toChangelogStream(resultTable)

    // add a printing sink and execute in DataStream API
    resultStream.print()
    env.execute()
// prints:
// +I[Alice, 12]
// +I[Bob, 10]
// -U[Alice, 12]
// +U[Alice, 112]
  }
}

【4】おまけ:Lake Formationとの接続

 ChatGPT に
「Give me a sample for connecting Lake formation with Apache Flink  in Scala」
 にって言ったら以下のサンプル(一部改変)がでてきた。
# ChatGPT すげー。。。

ただし、実際にはコンパイル通らない(※)ので、使えないが取っ掛かりにはいいかも。

※ 例えば「useBlinkPlanner()がない」って怒られる
(調べてみると非推奨で最新のFlinkだともうないっぽい)

ChatGPTからの出力結果(注意:ただし実際にはコンパイル通らない)

import org.apache.flink.api.scala._
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row

object  Hello {
  def main(args: Array[String]): Unit = {
    // Set up the execution environment
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env, settings)

    // Define your AWS Lake Formation credentials
    val accessKey = "your_access_key"
    val secretKey = "your_secret_key"
    val region = "us-west-2"

    // Set up the AWS credentials
    env.getConfig.setGlobalJobParameters(
      new JobParameters().setString("aws_access_key_id", accessKey).setString("aws_secret_access_key", secretKey).setString("aws.region", region)
    )

    // Define your Lake Formation catalog name and database name
    val catalogName = "your_catalog_name"
    val databaseName = "your_database_name"

    // Register Lake Formation catalog
    tableEnv.executeSql(
      s"""
         |CREATE CATALOG lake_catalog WITH (
         |  type = 'hive',
         |  property(
         |    'catalog-type' = 'lake-formation',
         |    'hive.aws-region' = '$region',
         |    'lake-formation-catalog-name' = '$catalogName'
         |  )
         |)
         """.stripMargin)

    // Use the catalog
    tableEnv.useCatalog("lake_catalog")

    // List the available databases
    tableEnv.executeSql(s"SHOW DATABASES").print()

    // Use the desired database
    tableEnv.useDatabase(databaseName)

    // Now you can run your Flink queries on the Lake Formation tables
    val resultTable = tableEnv.sqlQuery("SELECT * FROM your_table")

    // Print the result
    resultTable.toAppendStream[Row].print()

    // Execute the Flink job
    env.execute("Lake Formation Flink Job")
  }
}

関連記事

Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/29/000000
Apache Flink ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/07/23/161621
Apache Flink ~ DataStream API
https://dk521123.hatenablog.com/entry/2023/10/03/212007
Apache Flink ~ RichParallelSourceFunction ~
https://dk521123.hatenablog.com/entry/2023/09/15/000000
LakeFormation ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2020/10/13/094041