【Kafka】Kafka Connect ~ 環境構築編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/04/23/235534
https://dk521123.hatenablog.com/entry/2023/04/24/153846

で、Kafkaの環境構築を行い

https://dk521123.hatenablog.com/entry/2023/08/11/220921

で、Kafka Connectの環境構築をDocker composeで行ったのだが
諸事情で、Kafka Connectの環境構築をマニュアルで行ってみる

なお、動作確認は、以下の関連記事を参照のこと

kafka-python ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/10/24/000309

目次

【1】前提条件
 1)Kafka として使用した compose.yml
【2】構築手順
 1)Kafka の ダウンロード
 2)ライブラリ設定
 3)設定ファイルの作成
【3】起動手順
 1)Kafka起動
 2)Kafka connect の起動

【1】前提条件

* Apache Kafka が構築されていること
 => 今回は、Docker composeで構築したものを想定する

Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846

1)Kafka として使用した compose.yml

compose.yml

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.2
    container_name: broker
    ports:
      # To learn about configuring Kafka for access across networks see
      # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
      - "9092:9092"
      # For remote debug (https://github.com/ppatierno/kafka-connect-amqp/blob/master/README.md#debugging)
      - "5005:5005"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      # For remote debug (https://pleiades.io/help/idea/tutorial-remote-debug.html#dc6af7e)
      JAVA_TOOL_OPTIONS: -Xdebug -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
  postgres:
    image: postgres:latest
    container_name: postgres
    hostname: postgresql
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
      PGPASSWORD: password
      POSTGRES_DB: sample
      TZ: "Asia/Tokyo"
    ports:
      - 5431:5432
    volumes:
      - ./init-db:/docker-entrypoint-initdb.d
  pgweb:
    container_name: pgweb
    hostname: pgweb
    restart: always
    image: sosedoff/pgweb
    ports:
      - "18081:8081"
    environment:
      # Refer to https://github.com/sosedoff/pgweb/wiki/Usage
      # postgres://user:password@host:port/database?sslmode=[mode]
      DATABASE_URL: postgres://postgres:password@postgresql:5432/sample?sslmode=disable
    depends_on:
      - postgres
volumes:
  postgres:
    name: v_postgres

【2】構築手順

1)Kafka の ダウンロード

* 以下から、Kafkaをダウンロードしてくる
 => 今回は「kafka_2.13-2.8.1.tgz」をダウンロードした
 => これをKafka connectとして使う

https://kafka.apache.org/downloads
https://archive.apache.org/dist/kafka/
https://ftp.jaist.ac.jp/pub/apache/kafka/

mkdir ~/kafka
cd ~/kafka
wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.13-2.7.2.tgz
tar -xzvf kafka_2.13-2.7.2.tgz
cd kafka_2.13-2.7.2/

2)ライブラリ設定

* 必要なドライバをダウンロードして設定する

[1] ダウンロード

* ドライバをダウンロードする
 => 今回の場合、以下のサイトから
 「confluentinc-kafka-connect-jdbc-10.7.4.zip」をダウンロードする

https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
[2] 設定

* zipを解凍し、<Kafkaの位置>/libs (/home/user/kafka/kafka_2.13-2.7.2/libs)
 配下に必要なJarファイルを追加する
 + kafka-connect-jdbc-10.7.4.jar
 + postgresql-42.4.3.jar

3)設定ファイルの作成

[1] connect-standalone.propertiesの修正

* 既存のconnect-standalone.propertiesの修正する
 => 以下の★部分

connect-standalone.properties

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# ★plugin.path=<Kafkaの位置>/libs
plugin.path=/home/user/kafka/kafka_2.13-2.7.2/libs

[2] Sinkの設定ファイルの新規追加
https://docs.confluent.io/ja-jp/kafka-connectors/jdbc/10.0/sink-connector/sink_config_options.html
jdbc-connect-sink.properties

name=local-console-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url:jdbc:postgresql://localhost:5431/sample
connection.user:postgres
connection.password:password
tasks.max:1

# Set your topic
topics=demo_person

# Refer to https://docs.confluent.io/ja-jp/platform/7.1/connect/userguide.html#avro
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
auto.create=true
auto.evolve=true
insert.mode=insert
delete.enabled=true
pk.mode=record_key
pk.fields=id

【3】起動手順

1)Kafka起動

sudo docker compose up -d

# 停止させる場合は
# sudo docker compose down -v

2)Kafka connect の起動

cd ~/kafka/kafka_2.13-2.7.2
./bin/connect-standalone.sh ./config/connect-standalone.properties ./config/jdbc-connect-sink.properties

関連記事

Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ Remote debug ~
https://dk521123.hatenablog.com/entry/2023/10/23/125909
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Kafka Connect ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/08/11/220921
Kafka Connect ~ DB/Schema切替 ~
https://dk521123.hatenablog.com/entry/2023/10/26/235951
kafka-python ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/10/24/000309
ローカル環境のKafkaでのトラブル
https://dk521123.hatenablog.com/entry/2023/10/19/210341
Docker compose ~ Pgweb/pgAdmin ~
https://dk521123.hatenablog.com/entry/2023/08/10/111919
Docker に関するトラブル
https://dk521123.hatenablog.com/entry/2017/09/24/162257