【Kafka】Kafka Connect ~ 環境構築 / Docker compose編 ~

■ はじめに

ローカル環境に、Dockerで
Flink -> Kafka/Kafka Connect -> PostgreSQL
の環境一式作るのを、コツコツやってきたのだが
やっとできた、、、

あと、ついでに、開発用なんで
KafkaとPostgreSQLのWeb UIも導入している

目次

【0】Github
【1】サンプル
【2】Kafka connectへのアクセス情報の登録
 注意点
 Step1:アクセス情報の登録
 Step2: 動作確認
 補足:登録情報を削除する場合

【0】Github

* ソース全体は、以下のGithubにあげている

https://github.com/dk521123/HelloWorldForFlinkWithKafkaNPostgreSQL

【1】サンプル

version: "3"

services:
  # [1-1] Kafka - zookeeper
  zookeeper:
    image: docker.io/confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      TZ: Asia/Tokyo
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
  # [1-2] Kafka
  broker:
    image: docker.io/confluentinc/cp-kafka:latest
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "19092:19092"
    depends_on:
      - zookeeper
    environment:
      TZ: Asia/Tokyo
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:19092,PLAINTEXT_HOST://localhost:9092
      CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9997
      KAFKA_JMX_HOSTNAME: broker
  # [1-3] Kafka - CLI
  cli:
    image: docker.io/confluentinc/cp-kafka:latest
    hostname: cli
    container_name: cli
    depends_on:
      - broker
    entrypoint: /bin/bash
    tty: true
  # [1-4] Kafka connect
  kafka-connect:
    image: confluentinc/cp-kafka-connect-base:latest
    hostname: connect
    container_name: kafka-connect
    ports:
      - 8083:8083
    links:
      - broker
      - zookeeper
      - postgres
    environment:
      CONNECT_BOOTSTRAP_SERVERS: broker:19092
      CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect-group"
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
    volumes:
      - ./inits/init-kafka-connect/jars:/etc/kafka-connect/jars
      - ./inits/init-kafka-connect/config:/etc/kafka-connect/config
  # [1-5] Kafka to initialize
  init-kafka:
    image: confluentinc/cp-kafka:latest
    container_name: init-kafka
    depends_on:
      - broker
    entrypoint: [ '/bin/sh', '-c' ]
    command: |
      "
      # blocks until kafka is reachable
      kafka-topics --bootstrap-server broker:19092 --list

      echo -e 'Creating kafka topics'
      kafka-topics --bootstrap-server broker:19092 --create --if-not-exists --topic demo_counter --replication-factor 1 --partitions 1

      echo -e 'Successfully created the following topics:'
      kafka-topics --bootstrap-server broker:19092 --list
      "
  # [1-6] Kafka Web UI
  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - 3000:8080
    depends_on:
      - broker
    restart: always
    environment:
      TZ: Asia/Tokyo
      KAFKA_CLUSTERS_0_NAME: broker-1
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:19092
      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
  # [2-1] PostgreSQL
  postgres:
    image: postgres:latest
    container_name: postgres
    hostname: postgresql
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
      PGPASSWORD: password
      POSTGRES_DB: sample
      TZ: "Asia/Tokyo"
    ports:
      - 5431:5432
    volumes:
      - volume_postgres:/var/lib/postgresql/data
      - ./inits/init-db:/docker-entrypoint-initdb.d
  # [2-2] PostgreSQL Web UI (PgAdmin)
  pgadmin:
    image: dpage/pgadmin4
    container_name: pgadmin
    hostname: pgadmin
    restart: always
    ports:
      - 18080:80
    volumes:
      - volume_pgadmin:/var/lib/pgadmin
      - ./inits/init-pgadmin/servers.json:/pgadmin4/servers.json # pre-configured servers/connections
      - ./inits/init-pgadmin/pgpass:/pgpass # passwords for the connections in this file
    environment:
      PGADMIN_DEFAULT_EMAIL: demo@sample.com
      PGADMIN_DEFAULT_PASSWORD: password
    depends_on:
      - postgres
volumes:
  volume_postgres:
    name: v_postgres
  volume_pgadmin:
    name: v_pgadmin

【2】Kafka connectへのアクセス情報の登録

* Kafka connect の REST API を使う

Kafka Connect ~ Rest API
https://dk521123.hatenablog.com/entry/2023/05/31/000000

注意点

「sudo docker compose up -d」から暫くしてから
実行したほうがいい

Step1:アクセス情報の登録

curl -X POST \
  -H "Content-Type: application/json" \
  --data '{ "name": "demo-jdbc-sink", "config": { "pk.mode": "record_key", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": 1, "connection.url": "jdbc:postgresql://postgresql:5432/demo_db", "connection.user": "postgres", "connection.password": "password", "insert.mode": "insert", "auto.create": "true", "topics": "demo_counter" } }' \
  http://localhost:8083/connectors

https://docs.confluent.io/ja-jp/platform/7.1/connect/userguide.html
https://docs.confluent.io/ja-jp/kafka-connectors/jdbc/10.0/sink-connector/sink_config_options.html

Step2: 動作確認

# curl -s http://localhost:8083/connectors/(KAFKA_CONNECT_NAME)/status/
curl -s http://localhost:8083/connectors/demo-jdbc-sink/status/

{"name":"demo-jdbc-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"localhost:8083"}],"type":"sink"}

補足:登録情報を削除する場合

# DELETE /connectors/(string:name)/
curl -X DELETE -s http://localhost:8083/connectors/demo-jdbc-sink

参考文献

https://qiita.com/turupon/items/540be949ead3f4065553

関連記事

Kafka Connect ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/29/185133
Kafka Connect ~ Rest API
https://dk521123.hatenablog.com/entry/2023/05/31/000000
Kafka Connect での デバッグ方法
https://dk521123.hatenablog.com/entry/2023/08/15/215636
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Docker compose ~ Pgweb/pgAdmin ~
https://dk521123.hatenablog.com/entry/2023/08/10/111919
Apache Flink ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/07/28/220039