■ はじめに
https://dk521123.hatenablog.com/entry/2023/10/03/212007
の続き。 Flinkによる Lake Formationとの接続を ChatGPT でサンプルコード出してもらったら()
目次
【1】Table API & SQL 1)Table API 2)SQL 3)Catalogs 【2】準備 - 依存ライブラリ - 1)sbt 【3】サンプル 例1:Hello world 【4】おまけ:Lake Formationとの接続
【1】Table API & SQL
* PySparkでやったような感じでできる
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/overview/
1)Table API
* データ(Stream/バッチ問わず)をTableみたいに扱うことができるAPI
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#table-api
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/
2)SQL
* SQLライクに処理できる
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#sql
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/overview/
3)Catalogs
* Data Catalogに関するAPI
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/
メモ
「JdbcCatalog」や「HiveCatalog」ってのもあるので、 Lake Formationは、Hive互換があるからこれを使ってもいけそうっと思ったけど、 コンストラクタみたら「String hiveConfDir」が必ず必要そうなので、今回は使えないかも。
https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/catalog/hive/HiveCatalog.html
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
【2】準備 - 依存ライブラリ -
* ぶっちゃけ、以下の関連記事と同じ
Apache Flink ~ DataStream API ~
https://dk521123.hatenablog.com/entry/2023/10/03/212007
1)sbt
val flinkVersion = "1.16.1" val flinkDependencies = Seq( // ... // Add to "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided", "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion % "provided", "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided" )
【3】サンプル
例1:Hello world
import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.types.Row object Demo { def main(args: Array[String]): Unit = { // create environments of both APIs val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) // create a DataStream val dataStream = env.fromElements( Row.of("Alice", Int.box(12)), Row.of("Bob", Int.box(10)), Row.of("Alice", Int.box(100)) )(Types.ROW(Types.STRING, Types.INT)) // interpret the insert-only DataStream as a Table val inputTable = tableEnv.fromDataStream(dataStream).as("name", "score") // register the Table object as a view and query it // the query contains an aggregation that produces updates tableEnv.createTemporaryView("InputTable", inputTable) val resultTable = tableEnv.sqlQuery("SELECT name, SUM(score) FROM InputTable GROUP BY name") // interpret the updating Table as a changelog DataStream val resultStream = tableEnv.toChangelogStream(resultTable) // add a printing sink and execute in DataStream API resultStream.print() env.execute() // prints: // +I[Alice, 12] // +I[Bob, 10] // -U[Alice, 12] // +U[Alice, 112] } }
【4】おまけ:Lake Formationとの接続
ChatGPT に 「Give me a sample for connecting Lake formation with Apache Flink in Scala」 にって言ったら以下のサンプル(一部改変)がでてきた。 # ChatGPT すげー。。。 ただし、実際にはコンパイル通らない(※)ので、使えないが取っ掛かりにはいいかも。 ※ 例えば「useBlinkPlanner()がない」って怒られる (調べてみると非推奨で最新のFlinkだともうないっぽい)
ChatGPTからの出力結果(注意:ただし実際にはコンパイル通らない)
import org.apache.flink.api.scala._ import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.types.Row object Hello { def main(args: Array[String]): Unit = { // Set up the execution environment val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env, settings) // Define your AWS Lake Formation credentials val accessKey = "your_access_key" val secretKey = "your_secret_key" val region = "us-west-2" // Set up the AWS credentials env.getConfig.setGlobalJobParameters( new JobParameters().setString("aws_access_key_id", accessKey).setString("aws_secret_access_key", secretKey).setString("aws.region", region) ) // Define your Lake Formation catalog name and database name val catalogName = "your_catalog_name" val databaseName = "your_database_name" // Register Lake Formation catalog tableEnv.executeSql( s""" |CREATE CATALOG lake_catalog WITH ( | type = 'hive', | property( | 'catalog-type' = 'lake-formation', | 'hive.aws-region' = '$region', | 'lake-formation-catalog-name' = '$catalogName' | ) |) """.stripMargin) // Use the catalog tableEnv.useCatalog("lake_catalog") // List the available databases tableEnv.executeSql(s"SHOW DATABASES").print() // Use the desired database tableEnv.useDatabase(databaseName) // Now you can run your Flink queries on the Lake Formation tables val resultTable = tableEnv.sqlQuery("SELECT * FROM your_table") // Print the result resultTable.toAppendStream[Row].print() // Execute the Flink job env.execute("Lake Formation Flink Job") } }
関連記事
Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/29/000000
Apache Flink ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/07/23/161621
Apache Flink ~ DataStream API ~
https://dk521123.hatenablog.com/entry/2023/10/03/212007
Apache Flink ~ RichParallelSourceFunction ~
https://dk521123.hatenablog.com/entry/2023/09/15/000000
LakeFormation ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2020/10/13/094041