■ はじめに
ローカル環境に、Dockerで Flink -> Kafka/Kafka Connect -> PostgreSQL の環境一式作るのを、コツコツやってきたのだが やっとできた、、、 あと、ついでに、開発用なんで KafkaとPostgreSQLのWeb UIも導入している
目次
【0】Github 【1】サンプル 【2】Kafka connectへのアクセス情報の登録 注意点 Step1:アクセス情報の登録 Step2: 動作確認 補足:登録情報を削除する場合
【0】Github
* ソース全体は、以下のGithubにあげている
https://github.com/dk521123/HelloWorldForFlinkWithKafkaNPostgreSQL
【1】サンプル
version: "3" services: # [1-1] Kafka - zookeeper zookeeper: image: docker.io/confluentinc/cp-zookeeper:latest hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: TZ: Asia/Tokyo ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 # [1-2] Kafka broker: image: docker.io/confluentinc/cp-kafka:latest hostname: broker container_name: broker ports: - "9092:9092" - "19092:19092" depends_on: - zookeeper environment: TZ: Asia/Tokyo KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:19092,PLAINTEXT_HOST://localhost:9092 CONFLUENT_SUPPORT_METRICS_ENABLE: "false" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9997 KAFKA_JMX_HOSTNAME: broker # [1-3] Kafka - CLI cli: image: docker.io/confluentinc/cp-kafka:latest hostname: cli container_name: cli depends_on: - broker entrypoint: /bin/bash tty: true # [1-4] Kafka connect kafka-connect: image: confluentinc/cp-kafka-connect-base:latest hostname: connect container_name: kafka-connect ports: - 8083:8083 links: - broker - zookeeper - postgres environment: CONNECT_BOOTSTRAP_SERVERS: broker:19092 CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181 CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: "connect-group" CONNECT_CONFIG_STORAGE_TOPIC: "connect-config" CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets" CONNECT_STATUS_STORAGE_TOPIC: "connect-status" CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_REST_ADVERTISED_HOST_NAME: "localhost" CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars" volumes: - ./inits/init-kafka-connect/jars:/etc/kafka-connect/jars - ./inits/init-kafka-connect/config:/etc/kafka-connect/config # [1-5] Kafka to initialize init-kafka: image: confluentinc/cp-kafka:latest container_name: init-kafka depends_on: - broker entrypoint: [ '/bin/sh', '-c' ] command: | " # blocks until kafka is reachable kafka-topics --bootstrap-server broker:19092 --list echo -e 'Creating kafka topics' kafka-topics --bootstrap-server broker:19092 --create --if-not-exists --topic demo_counter --replication-factor 1 --partitions 1 echo -e 'Successfully created the following topics:' kafka-topics --bootstrap-server broker:19092 --list " # [1-6] Kafka Web UI kafka-ui: container_name: kafka-ui image: provectuslabs/kafka-ui:latest ports: - 3000:8080 depends_on: - broker restart: always environment: TZ: Asia/Tokyo KAFKA_CLUSTERS_0_NAME: broker-1 KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:19092 KAFKA_CLUSTERS_0_METRICS_PORT: 9997 # [2-1] PostgreSQL postgres: image: postgres:latest container_name: postgres hostname: postgresql restart: always environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: password PGPASSWORD: password POSTGRES_DB: sample TZ: "Asia/Tokyo" ports: - 5431:5432 volumes: - volume_postgres:/var/lib/postgresql/data - ./inits/init-db:/docker-entrypoint-initdb.d # [2-2] PostgreSQL Web UI (PgAdmin) pgadmin: image: dpage/pgadmin4 container_name: pgadmin hostname: pgadmin restart: always ports: - 18080:80 volumes: - volume_pgadmin:/var/lib/pgadmin - ./inits/init-pgadmin/servers.json:/pgadmin4/servers.json # pre-configured servers/connections - ./inits/init-pgadmin/pgpass:/pgpass # passwords for the connections in this file environment: PGADMIN_DEFAULT_EMAIL: demo@sample.com PGADMIN_DEFAULT_PASSWORD: password depends_on: - postgres volumes: volume_postgres: name: v_postgres volume_pgadmin: name: v_pgadmin
【2】Kafka connectへのアクセス情報の登録
* Kafka connect の REST API を使う
Kafka Connect ~ Rest API ~
https://dk521123.hatenablog.com/entry/2023/05/31/000000
注意点
「sudo docker compose up -d」から暫くしてから 実行したほうがいい
Step1:アクセス情報の登録
curl -X POST \ -H "Content-Type: application/json" \ --data '{ "name": "demo-jdbc-sink", "config": { "pk.mode": "record_key", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": 1, "connection.url": "jdbc:postgresql://postgresql:5432/demo_db", "connection.user": "postgres", "connection.password": "password", "insert.mode": "insert", "auto.create": "true", "topics": "demo_counter" } }' \ http://localhost:8083/connectors
https://docs.confluent.io/ja-jp/platform/7.1/connect/userguide.html
https://docs.confluent.io/ja-jp/kafka-connectors/jdbc/10.0/sink-connector/sink_config_options.html
Step2: 動作確認
# curl -s http://localhost:8083/connectors/(KAFKA_CONNECT_NAME)/status/ curl -s http://localhost:8083/connectors/demo-jdbc-sink/status/ {"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"localhost:8083"}],"type":"sink"}
補足:登録情報を削除する場合
# DELETE /connectors/(string:name)/ curl -X DELETE -s http://localhost:8083/connectors/demo-jdbc-sink
参考文献
https://qiita.com/turupon/items/540be949ead3f4065553
関連記事
Kafka Connect ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Kafka Connect ~ Rest API ~
https://dk521123.hatenablog.com/entry/2023/05/31/000000
Kafka Connect での デバッグ方法
https://dk521123.hatenablog.com/entry/2023/08/15/215636
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Docker compose ~ Pgweb/pgAdmin ~
https://dk521123.hatenablog.com/entry/2023/08/10/111919
Apache Flink ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/07/28/220039