■ はじめに
https://dk521123.hatenablog.com/entry/2023/10/22/220717
でKafka / Kafka connect を設定したので 簡易的に、kafka-python を使って、Hello worldをやってみる
目次
【1】前提条件 【2】環境設定 【3】ドキュメント 【4】サンプル 例1:ProducerからKafka 経由でPostgreSQLにデータ作成
【1】前提条件
* Kafka / Kafka connect が設定され、起動してあること => 以下の関連記事を参照のこと
Kafka Connect ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/10/22/220717
【2】環境設定
pip install kafka-python
【3】ドキュメント
https://kafka-python.readthedocs.io/en/master/apidoc/modules.html
[1] KafkaConsumer
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
[2] KafkaProducer
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
[3] KafkaAdminClient
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html
[4] KafkaClient
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html
[5] BrokerConnection
https://kafka-python.readthedocs.io/en/master/apidoc/BrokerConnection.html
[6] ClusterMetadata
https://kafka-python.readthedocs.io/en/master/apidoc/ClusterMetadata.html
【4】サンプル
例1:ProducerからKafka 経由でPostgreSQLにデータ作成
from kafka import KafkaProducer import json def on_success(metadata): print(f"Message produced to topic '{metadata.topic}' at partition {metadata.partition} offset {metadata.offset}") def on_error(e): print(f"Error sending message: {e}") print("Start") producer = KafkaProducer(bootstrap_servers=['localhost:9092']) # Refer to https://gist.github.com/rmoff/2b922fd1f9baf3ba1d66b98e9dd7b364 data = { "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": "false", "field": "id" }, { "type": "string", "optional": "false", "field": "name" }, { "type": "int64", "optional": "false", "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_at" }, { "type": "int64", "optional": "false", "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_at" } ], "optional": "false", "name": "demo_person" }, "payload": { "id": 123, "name": "Mike", "create_at": 1501834166000, "update_at": 1501834166000 } } value = json.dumps(data).encode('utf-8') key = json.dumps(123).encode('utf-8') # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer.send # send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None) future = producer.send('demo_person', value=value, key=key) future.add_callback(on_success) future.add_errback(on_error) # producer.send('demo_person', value).add_callback(on_success).add_errback(on_error) producer.flush(timeout=10) producer.close(timeout=5) print("Done")
Kafka connect のコンソールログ
[2023-10-23 23:50:04,596] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:56) [2023-10-23 23:50:04,691] INFO Checking PostgreSql dialect for existence of TABLE "demo_person" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:589) [2023-10-23 23:50:04,710] INFO Using PostgreSql dialect TABLE "demo_person" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:597) [2023-10-23 23:50:04,713] INFO Creating table with sql: CREATE TABLE "demo_person" ( "id" TEXT NOT NULL, "name" TEXT NOT NULL, "create_at" TIMESTAMP NOT NULL, "update_at" TIMESTAMP NOT NULL, PRIMARY KEY("id")) (io.confluent.connect.jdbc.sink.DbStructure:122) [2023-10-23 23:50:04,756] INFO Checking PostgreSql dialect for existence of TABLE "demo_person" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:589) [2023-10-23 23:50:04,762] INFO Using PostgreSql dialect TABLE "demo_person" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:597) [2023-10-23 23:50:04,791] INFO Checking PostgreSql dialect for type of TABLE "demo_person" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:883) [2023-10-23 23:50:04,803] INFO Setting metadata for table "demo_person" to Table{name='"demo_person"', type=TABLE columns=[Column{'create_at', isPrimaryKey=false, allowsNull=false, sqlType=timestamp}, Column{'name', isPrimaryKey=false, allowsNull=false, sqlType=text}, Column{'update_at', isPrimaryKey=false, allowsNull=false, sqlType=timestamp}, Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=text}]} (io.confluent.connect.jdbc.util.TableDefinitions:64) [2023-10-23 23:50:12,323] INFO WorkerSinkTask{id=local-console-sink-0} Committing offsets asynchronously using sequence number 2: {demo_person-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:353)
テーブル確認
* 以下にアクセスして、demo_person テーブルが存在するか確認する
* demo_person テーブルに以下の値が入っていることを確認する + id:123 + name:Mike + create_at:2017-08-04T08:09:26Z + update_at:2017-08-04T08:09:26Z
参考文献
https://qiita.com/montblanc18/items/05235c960f3527830415
https://zenn.dev/kyami/articles/546d0701f137c7
関連記事
Apache Kafka ~ 基礎知識編 ~
https://dk521123.hatenablog.com/entry/2023/04/26/103421
Apache Kafka ~ 入門編 ~
https://dk521123.hatenablog.com/entry/2023/05/01/000000
Kafka Connect ~ 環境構築編 ~
https://dk521123.hatenablog.com/entry/2023/10/22/220717
ローカル環境のKafkaでのトラブル
https://dk521123.hatenablog.com/entry/2023/10/19/210341