【Flink】Apache Flink ~ 環境構築 / Docker compose編 ~

■ はじめに

Apache Flink をやっているのだが、
ほとんど情報がない。
ましてや、ちょっとハードルをあげる、
例えば、Flinkで処理したデータをDBに入れる
ってなると、ほぼほぼ皆無な状況。

そこで、今回は、そんな試行錯誤した
Apache Flink + PostgreSQL の サンプルコードを取り上げる。
また、環境構築とか面倒くさくないようにdocker compose で
一発でたてあげ、実行も sbt run だけで済ませるような
サンプルにする。
(こういうサンプルでありそうで、なかなかない、、、)

目次

【1】今回の目的
 1)関連している記事
【2】サンプル
 1)フォルダ構成
 2)環境に関わるファイル
 3)Flinkコード
【3】動作確認
 1)Web UI

【1】今回の目的

[1] Flink + PostgreSQL の シンプルなサンプルする
[2] 環境は、docker compose で一発で構築する

1)関連している記事

* 以下の関連記事を一部利用して作成している

Docker compose ~ PostgreSQL
https://dk521123.hatenablog.com/entry/2023/07/20/025544

【2】サンプル

* Flink の docker に関しては、以下の公式ドキュメント参照。

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/

* 全体のコードは、以下のGithubにあげてある

https://github.com/dk521123/HelloWorldForFlinkWithPostgreSQL

1)フォルダ構成

├── build.sbt
├── compose.yml ... Docker compose file
├── init-db
│   └── init.sql ... SQL to initialize DB 
├── input
│   └── word.txt ... Input file
└── src
    └── main
        ├── resources
        │   └── log4j2.xml
        └── scala
            └── dk.com.HelloFlinkPostgre.scala ... Flink code

2)環境に関わるファイル

compose.yaml

version: '3'

services:
  # 1) sbt
  # .sbt, .ivy2 and .m2 cache directories
  sbt:
    build:
      context: ./
    image: sbt
    container_name: sbt
    volumes:
      - ~/.sbt:/root/.sbt
      - ~/.ivy2:/root/.ivy2
      - ~/.m2:/root/.m2
  # 2) PostgreSQL (Instead of Snowflake)
  postgres:
    image: postgres:latest
    container_name: postgres
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
      PGPASSWORD: password
      POSTGRES_DB: sample
      TZ: "Asia/Tokyo"
    ports:
      - 5431:5432
    volumes:
      - ./init-db:/docker-entrypoint-initdb.d
  # 3) Flink
  jobmanager:
    image: flink:latest
    hostname: jobmanager
    container_name: jobmanager
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
  taskmanager:
    image: flink:latest
    container_name: task_manager
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
volumes:
  postgres:
    name: v_postgres

./init-db/init.sql

CREATE USER demo_db WITH PASSWORD 'password';
CREATE DATABASE demo_db;
GRANT ALL PRIVILEGES ON DATABASE demo_db TO demo_db;
\c demo_db
create table demo_counter(word varchar, counter int);
insert into demo_counter(word, counter) values
('dummy1', 100),
('dummy2', 2000);

input/input.txt (正直なんでもいい)

APACHE KAFKA
More than 80% of all Fortune 100 companies trust, and use Kafka.

Apache Kafka is an open-source distributed event streaming platform
used by thousands of companies for high-performance data pipelines,
streaming analytics, data integration, and mission-critical applications.

3)Flinkコード

* 以下の関連記事を参照のこと

Apache Flink ~ Flink to PostgreSQL
https://dk521123.hatenablog.com/entry/2023/08/12/133352

【3】動作確認

[1] 実行

$ sudo docker compose up -d
[+] Running 5/5
 ✔ Network hello-flink-postgre_default  Created                                 0.2s 
 ✔ Container jobmanager                 Started                                 1.2s 
 ✔ Container sbt                        Started                                 1.2s 
 ✔ Container postgres                   Started                                 1.0s 
 ✔ Container task_manager               Started                                 1.8s

$ sbt run
[info] welcome to sbt 1.3.13 (Ubuntu Java 11.0.19)
[info] loading settings for project hello-flink-postgre-build from assembly.sbt ...
...
[info] 2023/07/28 07:46:43.676 INFO  - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
# (一瞬ビビるが) ひとまず、このエラーはほっといていい
[error] WARNING: An illegal reflective access operation has occurred
[error] WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/home/user/.cache/coursier/v1/https/repo1.maven.org/maven2/org/apache/flink/flink-core/1.16.1/flink-core-1.16.1.jar) to field java.lang.String.value
[error] WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
[error] WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
[error] WARNING: All illegal access operations will be denied in a future release
[info] 2023/07/28 07:46:46.170 INFO  - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value.
...
[info] 2023/07/28 07:46:50.980 INFO  - Done...
[info] 2023/07/28 07:46:50.982 INFO  - Shut down complete.
[info] 2023/07/28 07:46:50.982 INFO  - FileChannelManager removed spill file directory /tmp/flink-io-87405248-7b2c-4a8b-a8b6-77842b72e156
...
[info] 2023/07/28 07:46:51.121 INFO  - Stopped Akka RPC service.
[success] Total time: 11 s, completed Jul 28, 2023, 7:46:52 AM

[2] 動作確認

* 以下にDBクライアントなどで以下にアクセス
 + Host Name : localhost
 + Port : 5431
 + DB name : demo_db
 + User : postgres
 + Passwor : password

* データを確認する

SELECT COUNT(*) FROM demo_counter;
45

SELECT * FROM demo_counter;
"word","counter"
"dummy1","100"
"dummy2","2000"
"trust","1"
"data","1"
...
"apache","2"
"critical","1"

[3] 後片付け

$ sudo docker compose down -v

[+] Running 5/5
 ✔ Container sbt                        Removed                                 0.1s 
 ✔ Container postgres                   Removed                                 1.3s 
 ✔ Container task_manager               Removed                                 2.2s 
 ✔ Container jobmanager                 Removed                                 1.5s 
 ✔ Network hello-flink-postgre_default  Removed                                 0.7s

1)Web UI

* 以下で、Web UIが表示する

http://localhost:8081/

関連記事

Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/29/000000
Apache Flink ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/07/23/161621
Apache Flink ~ Flink to PostgreSQL
https://dk521123.hatenablog.com/entry/2023/08/12/133352
Docker compose ~ PostgreSQL
https://dk521123.hatenablog.com/entry/2023/07/20/025544
Kafka Connect ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/08/11/220921