我开始接触 Apache Kafka (Confluent) 并对模式的使用有一些疑问。首先,我对架构用于验证数据的一般理解是否正确?我对模式的理解是,当“生成”数据时,它会检查键和值是否符合预定义的概念并相应地拆分它们。
我目前的技术设置如下:
Python:
from confluent_kafka import Producer
from config import conf
import json
# create producer
producer = Producer(conf)
producer.produce("datagen-topic", json.dumps({"product":"table","brand":"abc"}))
producer.flush()
在 Confluent 中,我为我的主题设置了一个 json 键模式:
{
"$schema": "http://json-schema.org/draft-04/schema#",
"properties": {
"brand": {
"type": "string"
},
"product": {
"type": "string"
}
},
"required": [
"product",
"brand"
],
"type": "object"
}
现在,当我生成数据时,Confluent 中的消息仅包含“值”中的内容。Key 和 Header 为空:
{
"product": "table",
"brand": "abc"
}
基本上,如果我设置了这个模式并没有什么不同,所以我想它只是在我设置它时不起作用。您能帮我解决我的思维方式错误或我的代码缺少输入的地方吗?