【Flink】Apache Flink ~ Flink to PostgreSQL ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/07/28/220039

で、環境構築したが、
今回は、Flink to PostgreSQLのFlinkコードのサンプルを書く

【0】前提条件

* 以下の関連記事を参考に環境構築していること

Apache Flink ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/07/28/220039

【1】サンプル

package dk.com

import org.slf4j.Logger
import org.slf4j.LoggerFactory

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.connector.jdbc.JdbcExecutionOptions
import org.apache.flink.connector.jdbc.JdbcConnectionOptions
import org.apache.flink.connector.jdbc.JdbcStatementBuilder
import org.apache.flink.connector.jdbc.JdbcSink
import java.util.Properties
import java.sql.PreparedStatement
import scala.language.reflectiveCalls


object HelloFlinkPostgre {
  val LOG = LoggerFactory.getLogger("HelloWorld")

  def main(args: Array[String]): Unit = {
    LOG.info("Start!")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // val text = env.fromElements(
    //   "To be, or not to be,--that is the question:--",
    //   "Whether 'tis nobler in the mind to suffer",
    //   "The slings and arrows of outrageous fortune",
    //   "Or to take arms against a sea of troubles,"
    // )
    val text = env.readTextFile("input")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .keyBy(0)
      .sum(1)

    // https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/jdbc/JdbcSink.html
    val jdbcSink = JdbcSink.sink(
      // SQL
      "INSERT INTO demo_counter (word, counter) VALUES (?, ?)",
      new JdbcStatementBuilder[(String, Int)] {
        override def accept(statement: PreparedStatement, row: (String, Int)): Unit = {
          statement.setString(1, row._1);
          statement.setInt(2,  row._2);
        }
      },
      JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(1)
        .build(),
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:postgresql://localhost:5431/demo_db")
        .withDriverName("org.postgresql.Driver")
        .withUsername("postgres")
        .withPassword("password")
        .build()
    )
    //counts print
    counts.addSink(jdbcSink)

    env.execute("temp")

    LOG.info("Done...")
  }
}

build.sbt

ThisBuild / resolvers ++= Seq(
    "Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
    Resolver.mavenLocal
)

name := "hello-flink-postgre"

version := "0.1-SNAPSHOT"

organization := "dk.com"

// Modify
ThisBuild / scalaVersion := "2.12.18"
// Modify
val flinkVersion = "1.16.1"

val flinkDependencies = Seq(
  // Add from
  "org.slf4j" % "slf4j-api" % "2.0.7",
  "org.apache.logging.log4j" % "log4j-core" % "2.20.0",
  "org.apache.logging.log4j" % "log4j-slf4j2-impl" % "2.20.0",
  "org.apache.flink" % "flink-clients" % flinkVersion,
  "org.apache.flink" % "flink-state-processor-api" % flinkVersion,
  "org.apache.flink" % "flink-connector-jdbc" % "3.1.1-1.17" % "provided",
  "org.postgresql" % "postgresql" % "42.6.0",
  // Add to
  //"org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided")

lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies
  )

assembly / mainClass := Some("dk.com.Job")

// make run command include the provided dependencies
Compile / run  := Defaults.runTask(Compile / fullClasspath,
                                   Compile / run / mainClass,
                                   Compile / run / runner
                                  ).evaluated

// stays inside the sbt console when we press "ctrl-c" while a Flink programme executes with "run" or "runMain"
Compile / run / fork := true
Global / cancelable := true

// exclude Scala library from assembly
assembly / assemblyOption  := (assembly / assemblyOption).value.copy(includeScala = false)

実行例

$ sbt run
[info] welcome to sbt 1.3.13 (Ubuntu Java 11.0.19)
[info] loading settings for project hello-flink-postgre-build from assembly.sbt ...
...
[info] 2023/07/28 07:46:43.676 INFO  - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
# (一瞬ビビるが) ひとまず、このエラーはほっといていい
[error] WARNING: An illegal reflective access operation has occurred
[error] WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/home/user/.cache/coursier/v1/https/repo1.maven.org/maven2/org/apache/flink/flink-core/1.16.1/flink-core-1.16.1.jar) to field java.lang.String.value
[error] WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
[error] WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
[error] WARNING: All illegal access operations will be denied in a future release
[info] 2023/07/28 07:46:46.170 INFO  - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value.
...
[info] 2023/07/28 07:46:50.980 INFO  - Done...
[info] 2023/07/28 07:46:50.982 INFO  - Shut down complete.
[info] 2023/07/28 07:46:50.982 INFO  - FileChannelManager removed spill file directory /tmp/flink-io-87405248-7b2c-4a8b-a8b6-77842b72e156
...
[info] 2023/07/28 07:46:51.121 INFO  - Stopped Akka RPC service.
[success] Total time: 11 s, completed Jul 28, 2023, 7:46:52 AM

関連記事

Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/29/000000
Apache Flink ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/07/23/161621
Docker compose ~ PostgreSQL
https://dk521123.hatenablog.com/entry/2023/07/20/025544
Apache Flink ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/07/28/220039