6

confluent-kafka-python repoAvroProducer中的示例来看,键/值模式似乎是从文件中加载的。也就是说,从这段代码:

from confluent_kafka import avro 
from confluent_kafka.avro import AvroProducer

value_schema = avro.load('ValueSchema.avsc')
key_schema = avro.load('KeySchema.avsc')
value = {"name": "Value"}
key = {"name": "Key"}

avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value=value, key=key)

似乎这些文件是独立于 Avro Schema Registry 加载的ValueSchema.avscKeySchema.avsc

这是正确的吗?引用 Avro 模式注册表的 URL,然后从磁盘加载模式以获取键/值有什么意义?

请说清楚。

4

3 回答 3

3

我遇到了同样的问题,最初不清楚本地文件的意义是什么。正如其他答案所提到的,对于第一次写入 Avro 主题或更新主题架构,您需要架构字符串 - 您可以从此处的 Kafka REST 文档中看到这一点。

在注册表中获得模式后,您可以使用 REST 读取它(在本例中我使用 requests Python 模块)并使用 avro.loads() 方法来获取它。我发现这很有用,因为 producer() 函数要求您具有 AvroProducer 的值模式,并且此代码将在没有该本地文件的情况下工作:

get_schema_req_data = requests.get("http://1.2.3.4:8081/subjects/sample_value_schema/versions/latest")
get_schema_req_data.raise_for_status()
schema_string = get_schema_req_data.json()['schema']
value_schema = avro.loads(schema_string)
avroProducer = AvroProducer({'bootstrap.servers': '1.2.3.4:9092', 'schema.registry.url': 'http://1.2.3.4:8081'}, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value={"data" : "that matches your schema" })

希望这可以帮助。

于 2017-05-08T19:07:34.163 回答
1

这只是首先在模式注册表中创建键和值模式的一种方法。您可以先使用 SR REST API 在 SR 中创建它,也可以通过发布新消息来在 SR 中创建新模式或现有模式的新版本。首选哪种方法完全由您选择。

于 2017-04-29T02:46:42.917 回答
0

查看代码并考虑消费者而不是生产者需要注册表中的模式。MessageSerializer 为您在模式注册表中注册模式:)

于 2017-04-28T16:11:16.867 回答