我在 kafka 的生产者方面工作,以在主题中推送消息。我正在使用confluent-kafka
avro 生产者。
以下是我的架构.avsc
文件。
密钥.avsc
{
"namespace": "io.codebrews.schema.test",
"type": "record",
"name": "Keys",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": "string"
}
]
}
测试.avsc
{
"namespace": "io.codebrews.schema.test",
"type": "record",
"name": "Subscription",
"fields": [
{
"name": "test",
"type": "string"
},
{
"name": "keys",
"type": "io.codebrews.schema.test.Keys"
}
]
}
生产者.py
key_schema, value_schema = load_avro_schema_from_file('Subscription.avsc')
try:
producer = avro.AvroProducer(producer_config, default_key_schema=key_schema, default_value_schema=value_schema)
except Exception as e:
raise e
def load_avro_schema_from_file(schema_file):
key_schema_string = """
{"type": "string"}
"""
key_schema = avro.loads(key_schema_string)
value_schema = avro.load("./avro/" + schema_file)
return key_schema, value_schema
当我尝试注册Keys.avsc
时,它工作正常,没有错误。Test.avsc
但是当我注册后尝试注册时Keys.avsc
。我得到以下错误。
confluent_kafka.avro.error.ClientError:模式解析失败:未知命名模式'io.codebrews.schema.test.Keys',已知名称:['io.codebrews.schema.test.Subscription']。
手动注册架构后。
{
"namespace": "io.codebrews.schema.test",
"type": "record",
"name": "Subscription",
"fields": [
{
"name": "test",
"type": "string"
},
{
"name": "keys",
"type": "Keys"
}
]
}
在我的主题中推送消息时,出现以下错误。
ClientError: Incompatible Avro schema:409 message:{'error_code': 409, 'message': '正在注册的模式与主题“test-value”的早期模式不兼容。
我在这里做错了吗?
也有人可以帮助我如何在 python 中停止自动模式注册?