0

我开始接触 Apache Kafka (Confluent) 并对模式的使用有一些疑问。首先,我对架构用于验证数据的一般理解是否正确?我对模式的理解是,当“生成”数据时,它会检查键和值是否符合预定义的概念并相应地拆分它们。

我目前的技术设置如下:

Python:

from confluent_kafka import Producer
from config import conf
import json

# create producer
producer = Producer(conf)

producer.produce("datagen-topic", json.dumps({"product":"table","brand":"abc"}))
producer.flush()

在 Confluent 中,我为我的主题设置了一个 json 键模式:

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "properties": {
    "brand": {
      "type": "string"
    },
    "product": {
      "type": "string"
    }
  },
  "required": [
    "product",
    "brand"
  ],
  "type": "object"
}

现在,当我生成数据时,Confluent 中的消息仅包含“值”中的内容。Key 和 Header 为空:

{
  "product": "table",
  "brand": "abc"
}

基本上,如果我设置了这个模式并没有什么不同,所以我想它只是在我设置它时不起作用。您能帮我解决我的思维方式错误或我的代码缺少输入的地方吗?

4

1 回答 1

1

Confluent Python 库Producer类不会以任何方式与注册表交互,因此您的消息不会被验证。

您将希望SerializingProducer在示例中使用 - https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/json_producer.py

如果您想要非空键和标头,则需要将它们传递给发送方法

于 2021-04-17T19:40:28.460 回答