■ はじめに
https://dk521123.hatenablog.com/entry/2023/07/28/220039
で、環境構築したが、 今回は、Flink to PostgreSQLのFlinkコードのサンプルを書く
【0】前提条件
* 以下の関連記事を参考に環境構築していること
Apache Flink ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/07/28/220039
【1】サンプル
package dk.com import org.slf4j.Logger import org.slf4j.LoggerFactory import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.connector.jdbc.JdbcExecutionOptions import org.apache.flink.connector.jdbc.JdbcConnectionOptions import org.apache.flink.connector.jdbc.JdbcStatementBuilder import org.apache.flink.connector.jdbc.JdbcSink import java.util.Properties import java.sql.PreparedStatement import scala.language.reflectiveCalls object HelloFlinkPostgre { val LOG = LoggerFactory.getLogger("HelloWorld") def main(args: Array[String]): Unit = { LOG.info("Start!") val env = StreamExecutionEnvironment.getExecutionEnvironment // val text = env.fromElements( // "To be, or not to be,--that is the question:--", // "Whether 'tis nobler in the mind to suffer", // "The slings and arrows of outrageous fortune", // "Or to take arms against a sea of troubles," // ) val text = env.readTextFile("input") val counts = text.flatMap { _.toLowerCase.split("\\W+") } .map { (_, 1) } .keyBy(0) .sum(1) // https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/jdbc/JdbcSink.html val jdbcSink = JdbcSink.sink( // SQL "INSERT INTO demo_counter (word, counter) VALUES (?, ?)", new JdbcStatementBuilder[(String, Int)] { override def accept(statement: PreparedStatement, row: (String, Int)): Unit = { statement.setString(1, row._1); statement.setInt(2, row._2); } }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(1) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:postgresql://localhost:5431/demo_db") .withDriverName("org.postgresql.Driver") .withUsername("postgres") .withPassword("password") .build() ) //counts print counts.addSink(jdbcSink) env.execute("temp") LOG.info("Done...") } }
build.sbt
ThisBuild / resolvers ++= Seq( "Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/", Resolver.mavenLocal ) name := "hello-flink-postgre" version := "0.1-SNAPSHOT" organization := "dk.com" // Modify ThisBuild / scalaVersion := "2.12.18" // Modify val flinkVersion = "1.16.1" val flinkDependencies = Seq( // Add from "org.slf4j" % "slf4j-api" % "2.0.7", "org.apache.logging.log4j" % "log4j-core" % "2.20.0", "org.apache.logging.log4j" % "log4j-slf4j2-impl" % "2.20.0", "org.apache.flink" % "flink-clients" % flinkVersion, "org.apache.flink" % "flink-state-processor-api" % flinkVersion, "org.apache.flink" % "flink-connector-jdbc" % "3.1.1-1.17" % "provided", "org.postgresql" % "postgresql" % "42.6.0", // Add to //"org.apache.flink" %% "flink-clients" % flinkVersion % "provided", "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided") lazy val root = (project in file(".")). settings( libraryDependencies ++= flinkDependencies ) assembly / mainClass := Some("dk.com.Job") // make run command include the provided dependencies Compile / run := Defaults.runTask(Compile / fullClasspath, Compile / run / mainClass, Compile / run / runner ).evaluated // stays inside the sbt console when we press "ctrl-c" while a Flink programme executes with "run" or "runMain" Compile / run / fork := true Global / cancelable := true // exclude Scala library from assembly assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
実行例
$ sbt run [info] welcome to sbt 1.3.13 (Ubuntu Java 11.0.19) [info] loading settings for project hello-flink-postgre-build from assembly.sbt ... ... [info] 2023/07/28 07:46:43.676 INFO - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution. # (一瞬ビビるが) ひとまず、このエラーはほっといていい [error] WARNING: An illegal reflective access operation has occurred [error] WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/home/user/.cache/coursier/v1/https/repo1.maven.org/maven2/org/apache/flink/flink-core/1.16.1/flink-core-1.16.1.jar) to field java.lang.String.value [error] WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner [error] WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations [error] WARNING: All illegal access operations will be denied in a future release [info] 2023/07/28 07:46:46.170 INFO - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. ... [info] 2023/07/28 07:46:50.980 INFO - Done... [info] 2023/07/28 07:46:50.982 INFO - Shut down complete. [info] 2023/07/28 07:46:50.982 INFO - FileChannelManager removed spill file directory /tmp/flink-io-87405248-7b2c-4a8b-a8b6-77842b72e156 ... [info] 2023/07/28 07:46:51.121 INFO - Stopped Akka RPC service. [success] Total time: 11 s, completed Jul 28, 2023, 7:46:52 AM
関連記事
Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/29/000000
Apache Flink ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/07/23/161621
Docker compose ~ PostgreSQL ~
https://dk521123.hatenablog.com/entry/2023/07/20/025544
Apache Flink ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/07/28/220039