【Kafka】kafka-python ~ 入門編 ~

■ はじめに

https://dk521123.hatenablog.com/entry/2023/10/22/220717

でKafka / Kafka connect を設定したので
簡易的に、kafka-python を使って、Hello worldをやってみる

目次

【1】前提条件
【2】環境設定
【3】ドキュメント
【4】サンプル
 例1:ProducerからKafka 経由でPostgreSQLにデータ作成

【1】前提条件

* Kafka / Kafka connect が設定され、起動してあること
 => 以下の関連記事を参照のこと

Kafka Connect ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/10/22/220717

【2】環境設定

pip install kafka-python

【3】ドキュメント

https://kafka-python.readthedocs.io/en/master/apidoc/modules.html
[1] KafkaConsumer
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
[2] KafkaProducer
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
[3] KafkaAdminClient
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html
[4] KafkaClient
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html
[5] BrokerConnection
https://kafka-python.readthedocs.io/en/master/apidoc/BrokerConnection.html
[6] ClusterMetadata
https://kafka-python.readthedocs.io/en/master/apidoc/ClusterMetadata.html

【4】サンプル

例1:ProducerからKafka 経由でPostgreSQLにデータ作成

from kafka import KafkaProducer
import json

def on_success(metadata):
  print(f"Message produced to topic '{metadata.topic}' at partition {metadata.partition} offset {metadata.offset}")

def on_error(e):
  print(f"Error sending message: {e}")

print("Start")

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# Refer to https://gist.github.com/rmoff/2b922fd1f9baf3ba1d66b98e9dd7b364
data = {
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": "false",
        "field": "id"
      }, {
        "type": "string",
        "optional": "false",
        "field": "name"
      }, {
        "type": "int64",
        "optional": "false",
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "create_at"
      }, {
        "type": "int64",
        "optional": "false",
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "update_at"
      }
    ],
    "optional": "false",
    "name": "demo_person"
  },
  "payload": {
    "id": 123,
    "name": "Mike",
    "create_at": 1501834166000,
    "update_at": 1501834166000
  }
}

value = json.dumps(data).encode('utf-8')
key = json.dumps(123).encode('utf-8')

# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer.send
# send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)
future = producer.send('demo_person', value=value, key=key)
future.add_callback(on_success)
future.add_errback(on_error)
# producer.send('demo_person', value).add_callback(on_success).add_errback(on_error)

producer.flush(timeout=10)
producer.close(timeout=5)
print("Done")

Kafka connect のコンソールログ

[2023-10-23 23:50:04,596] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:56)
[2023-10-23 23:50:04,691] INFO Checking PostgreSql dialect for existence of TABLE "demo_person" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:589)
[2023-10-23 23:50:04,710] INFO Using PostgreSql dialect TABLE "demo_person" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:597)
[2023-10-23 23:50:04,713] INFO Creating table with sql: CREATE TABLE "demo_person" (
"id" TEXT NOT NULL,
"name" TEXT NOT NULL,
"create_at" TIMESTAMP NOT NULL,
"update_at" TIMESTAMP NOT NULL,
PRIMARY KEY("id")) (io.confluent.connect.jdbc.sink.DbStructure:122)
[2023-10-23 23:50:04,756] INFO Checking PostgreSql dialect for existence of TABLE "demo_person" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:589)
[2023-10-23 23:50:04,762] INFO Using PostgreSql dialect TABLE "demo_person" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:597)
[2023-10-23 23:50:04,791] INFO Checking PostgreSql dialect for type of TABLE "demo_person" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:883)
[2023-10-23 23:50:04,803] INFO Setting metadata for table "demo_person" to Table{name='"demo_person"', type=TABLE columns=[Column{'create_at', isPrimaryKey=false, allowsNull=false, sqlType=timestamp}, Column{'name', isPrimaryKey=false, allowsNull=false, sqlType=text}, Column{'update_at', isPrimaryKey=false, allowsNull=false, sqlType=timestamp}, Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=text}]} (io.confluent.connect.jdbc.util.TableDefinitions:64)
[2023-10-23 23:50:12,323] INFO WorkerSinkTask{id=local-console-sink-0} Committing offsets asynchronously using sequence number 2: {demo_person-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:353)

テーブル確認

* 以下にアクセスして、demo_person テーブルが存在するか確認する

http://localhost:18081/#

* demo_person テーブルに以下の値が入っていることを確認する
 + id:123
 + name:Mike
 + create_at:2017-08-04T08:09:26Z
 + update_at:2017-08-04T08:09:26Z

参考文献

https://qiita.com/montblanc18/items/05235c960f3527830415
https://zenn.dev/kyami/articles/546d0701f137c7

関連記事

Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Kafka Connect ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/10/22/220717
ローカル環境のKafkaでのトラブル
https://dk521123.hatenablog.com/entry/2023/10/19/210341