■ はじめに
https://dk521123.hatenablog.com/entry/2023/04/23/235534
https://dk521123.hatenablog.com/entry/2023/04/24/153846
で、Kafkaの環境構築を行い
https://dk521123.hatenablog.com/entry/2023/08/11/220921
で、Kafka Connectの環境構築をDocker composeで行ったのだが 諸事情で、Kafka Connectの環境構築をマニュアルで行ってみる なお、動作確認は、以下の関連記事を参照のこと
kafka-python ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/10/24/000309
目次
【1】前提条件 1)Kafka として使用した compose.yml 【2】構築手順 1)Kafka の ダウンロード 2)ライブラリ設定 3)設定ファイルの作成 【3】起動手順 1)Kafka起動 2)Kafka connect の起動
【1】前提条件
* Apache Kafka が構築されていること => 今回は、Docker composeで構築したものを想定する
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
1)Kafka として使用した compose.yml
compose.yml
version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:7.3.2 container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:7.3.2 container_name: broker ports: # To learn about configuring Kafka for access across networks see # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ - "9092:9092" # For remote debug (https://github.com/ppatierno/kafka-connect-amqp/blob/master/README.md#debugging) - "5005:5005" depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 # For remote debug (https://pleiades.io/help/idea/tutorial-remote-debug.html#dc6af7e) JAVA_TOOL_OPTIONS: -Xdebug -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 postgres: image: postgres:latest container_name: postgres hostname: postgresql restart: always environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: password PGPASSWORD: password POSTGRES_DB: sample TZ: "Asia/Tokyo" ports: - 5431:5432 volumes: - ./init-db:/docker-entrypoint-initdb.d pgweb: container_name: pgweb hostname: pgweb restart: always image: sosedoff/pgweb ports: - "18081:8081" environment: # Refer to https://github.com/sosedoff/pgweb/wiki/Usage # postgres://user:password@host:port/database?sslmode=[mode] DATABASE_URL: postgres://postgres:password@postgresql:5432/sample?sslmode=disable depends_on: - postgres volumes: postgres: name: v_postgres
【2】構築手順
1)Kafka の ダウンロード
* 以下から、Kafkaをダウンロードしてくる => 今回は「kafka_2.13-2.8.1.tgz」をダウンロードした => これをKafka connectとして使う
https://kafka.apache.org/downloads
https://archive.apache.org/dist/kafka/
https://ftp.jaist.ac.jp/pub/apache/kafka/
mkdir ~/kafka cd ~/kafka wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.13-2.7.2.tgz tar -xzvf kafka_2.13-2.7.2.tgz cd kafka_2.13-2.7.2/
2)ライブラリ設定
* 必要なドライバをダウンロードして設定する
[1] ダウンロード
* ドライバをダウンロードする => 今回の場合、以下のサイトから 「confluentinc-kafka-connect-jdbc-10.7.4.zip」をダウンロードする
https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
[2] 設定
* zipを解凍し、<Kafkaの位置>/libs (/home/user/kafka/kafka_2.13-2.7.2/libs) 配下に必要なJarファイルを追加する + kafka-connect-jdbc-10.7.4.jar + postgresql-42.4.3.jar
3)設定ファイルの作成
[1] connect-standalone.propertiesの修正
* 既存のconnect-standalone.propertiesの修正する => 以下の★部分
connect-standalone.properties
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000 # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, # ★plugin.path=<Kafkaの位置>/libs plugin.path=/home/user/kafka/kafka_2.13-2.7.2/libs
[2] Sinkの設定ファイルの新規追加
https://docs.confluent.io/ja-jp/kafka-connectors/jdbc/10.0/sink-connector/sink_config_options.html
jdbc-connect-sink.properties
name=local-console-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector connection.url:jdbc:postgresql://localhost:5431/sample connection.user:postgres connection.password:password tasks.max:1 # Set your topic topics=demo_person # Refer to https://docs.confluent.io/ja-jp/platform/7.1/connect/userguide.html#avro key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter auto.create=true auto.evolve=true insert.mode=insert delete.enabled=true pk.mode=record_key pk.fields=id
【3】起動手順
1)Kafka起動
sudo docker compose up -d # 停止させる場合は # sudo docker compose down -v
2)Kafka connect の起動
cd ~/kafka/kafka_2.13-2.7.2 ./bin/connect-standalone.sh ./config/connect-standalone.properties ./config/jdbc-connect-sink.properties
関連記事
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Apache Kafka ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/04/23/235534
Apache Kafka ~ Remote debug ~
https://dk521123.hatenablog.com/entry/2023/10/23/125909
Apache Kafka ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/04/24/153846
Kafka Connect ~ 環境構築 / Docker compose編 ~
https://dk521123.hatenablog.com/entry/2023/08/11/220921
Kafka Connect ~ DB/Schema切替 ~
https://dk521123.hatenablog.com/entry/2023/10/26/235951
kafka-python ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/10/24/000309
ローカル環境のKafkaでのトラブル
https://dk521123.hatenablog.com/entry/2023/10/19/210341
Docker compose ~ Pgweb/pgAdmin ~
https://dk521123.hatenablog.com/entry/2023/08/10/111919
Docker に関するトラブル
https://dk521123.hatenablog.com/entry/2017/09/24/162257