■ はじめに
Apache Flink をやっているのだが、 ほとんど情報がない。 ましてや、ちょっとハードルをあげる、 例えば、Flinkで処理したデータをDBに入れる ってなると、ほぼほぼ皆無な状況。 そこで、今回は、そんな試行錯誤した Apache Flink + PostgreSQL の サンプルコードを取り上げる。 また、環境構築とか面倒くさくないようにdocker compose で 一発でたてあげ、実行も sbt run だけで済ませるような サンプルにする。 (こういうサンプルでありそうで、なかなかない、、、)
目次
【1】今回の目的 1)関連している記事 【2】サンプル 1)フォルダ構成 2)環境に関わるファイル 3)Flinkコード 【3】動作確認 1)Web UI
【1】今回の目的
[1] Flink + PostgreSQL の シンプルなサンプルする [2] 環境は、docker compose で一発で構築する
1)関連している記事
* 以下の関連記事を一部利用して作成している
Docker compose ~ PostgreSQL ~
https://dk521123.hatenablog.com/entry/2023/07/20/025544
【2】サンプル
* Flink の docker に関しては、以下の公式ドキュメント参照。
* 全体のコードは、以下のGithubにあげてある
https://github.com/dk521123/HelloWorldForFlinkWithPostgreSQL
1)フォルダ構成
├── build.sbt ├── compose.yml ... Docker compose file ├── init-db │ └── init.sql ... SQL to initialize DB ├── input │ └── word.txt ... Input file └── src └── main ├── resources │ └── log4j2.xml └── scala └── dk.com.HelloFlinkPostgre.scala ... Flink code
2)環境に関わるファイル
compose.yaml
version: '3' services: # 1) sbt # .sbt, .ivy2 and .m2 cache directories sbt: build: context: ./ image: sbt container_name: sbt volumes: - ~/.sbt:/root/.sbt - ~/.ivy2:/root/.ivy2 - ~/.m2:/root/.m2 # 2) PostgreSQL (Instead of Snowflake) postgres: image: postgres:latest container_name: postgres restart: always environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: password PGPASSWORD: password POSTGRES_DB: sample TZ: "Asia/Tokyo" ports: - 5431:5432 volumes: - ./init-db:/docker-entrypoint-initdb.d # 3) Flink jobmanager: image: flink:latest hostname: jobmanager container_name: jobmanager ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: flink:latest container_name: task_manager depends_on: - jobmanager command: taskmanager scale: 1 environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 volumes: postgres: name: v_postgres
./init-db/init.sql
CREATE USER demo_db WITH PASSWORD 'password'; CREATE DATABASE demo_db; GRANT ALL PRIVILEGES ON DATABASE demo_db TO demo_db; \c demo_db create table demo_counter(word varchar, counter int); insert into demo_counter(word, counter) values ('dummy1', 100), ('dummy2', 2000);
input/input.txt (正直なんでもいい)
APACHE KAFKA More than 80% of all Fortune 100 companies trust, and use Kafka. Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
3)Flinkコード
* 以下の関連記事を参照のこと
Apache Flink ~ Flink to PostgreSQL ~
https://dk521123.hatenablog.com/entry/2023/08/12/133352
【3】動作確認
[1] 実行
$ sudo docker compose up -d [+] Running 5/5 ✔ Network hello-flink-postgre_default Created 0.2s ✔ Container jobmanager Started 1.2s ✔ Container sbt Started 1.2s ✔ Container postgres Started 1.0s ✔ Container task_manager Started 1.8s $ sbt run [info] welcome to sbt 1.3.13 (Ubuntu Java 11.0.19) [info] loading settings for project hello-flink-postgre-build from assembly.sbt ... ... [info] 2023/07/28 07:46:43.676 INFO - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution. # (一瞬ビビるが) ひとまず、このエラーはほっといていい [error] WARNING: An illegal reflective access operation has occurred [error] WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/home/user/.cache/coursier/v1/https/repo1.maven.org/maven2/org/apache/flink/flink-core/1.16.1/flink-core-1.16.1.jar) to field java.lang.String.value [error] WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner [error] WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations [error] WARNING: All illegal access operations will be denied in a future release [info] 2023/07/28 07:46:46.170 INFO - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. ... [info] 2023/07/28 07:46:50.980 INFO - Done... [info] 2023/07/28 07:46:50.982 INFO - Shut down complete. [info] 2023/07/28 07:46:50.982 INFO - FileChannelManager removed spill file directory /tmp/flink-io-87405248-7b2c-4a8b-a8b6-77842b72e156 ... [info] 2023/07/28 07:46:51.121 INFO - Stopped Akka RPC service. [success] Total time: 11 s, completed Jul 28, 2023, 7:46:52 AM
[2] 動作確認
* 以下にDBクライアントなどで以下にアクセス + Host Name : localhost + Port : 5431 + DB name : demo_db + User : postgres + Passwor : password * データを確認する SELECT COUNT(*) FROM demo_counter; 45 SELECT * FROM demo_counter; "word","counter" "dummy1","100" "dummy2","2000" "trust","1" "data","1" ... "apache","2" "critical","1"
[3] 後片付け
$ sudo docker compose down -v [+] Running 5/5 ✔ Container sbt Removed 0.1s ✔ Container postgres Removed 1.3s ✔ Container task_manager Removed 2.2s ✔ Container jobmanager Removed 1.5s ✔ Network hello-flink-postgre_default Removed 0.7s
1)Web UI
* 以下で、Web UIが表示する
関連記事
Apache Flink ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/03/01/235100
Apache Flink ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/29/000000
Apache Flink ~ 基本編 ~
https://dk521123.hatenablog.com/entry/2023/07/23/161621
Apache Flink ~ Flink to PostgreSQL ~
https://dk521123.hatenablog.com/entry/2023/08/12/133352
Docker compose ~ PostgreSQL ~
https://dk521123.hatenablog.com/entry/2023/07/20/025544
Kafka Connect ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/08/11/220921