想象有分开的应用程序:生产者和消费者。
生产者代码:
import os
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
avsc_dir = os.path.dirname(os.path.realpath(__file__))
value_schema = avro.load(os.path.join(avsc_dir, "basic_schema.avsc"))
config = {'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://0.0.0.0:8081'}
producer = AvroProducer(config=config, default_value_schema=value_schema)
producer.produce(topic='testavro', value={'first_name': 'Andrey', 'last_name': 'Volkonsky'})
basic_schema.avsc文件位于生产者应用程序中。其内容:
{
"name": "basic",
"type": "record",
"doc": "basic schema for tests",
"namespace": "python.test.basic",
"fields": [
{
"name": "first_name",
"doc": "first name",
"type": "string"
},
{
"name": "last_name",
"doc": "last name",
"type": "string"
}
]
}
目前,消费者内部的内容并不重要。
我们运行生产者一次,一切正常。然后我想添加年龄字段:
basic_schema.avsc:
{
"name": "basic",
"type": "record",
"doc": "basic schema for tests",
"namespace": "python.test.basic",
"fields": [
{
"name": "first_name",
"doc": "first name",
"type": "string"
},
{
"name": "last_name",
"doc": "last name",
"type": "string"
},
{
"name": "age",
"doc": "age",
"type": "int"
}
]
}
在这里我得到了错误:
confluent_kafka.avro.error.ClientError:不兼容的 Avro 架构:409
他们在这里说https://docs.confluent.io/platform/current/schema-registry/avro.html#summary对于兼容性类型 == BACKWARD 消费者应该首先更新。
我无法从技术上理解。我的意思是我必须将basic_schema.avsc文件复制到消费者并运行它吗?