0

我在 kafka 的生产者方面工作,以在主题中推送消息。我正在使用confluent-kafkaavro 生产者。

github上喜欢的问题

以下是我的架构.avsc文件。

密钥.avsc

{
    "namespace": "io.codebrews.schema.test",
    "type": "record",
    "name": "Keys",
    "fields": [
        {
            "name": "name",
            "type": "string"
        },
        {
            "name": "email",
            "type": "string"
        }
    ]
}

测试.avsc

{
    "namespace": "io.codebrews.schema.test",
    "type": "record",
    "name": "Subscription",
    "fields": [
        {
            "name": "test",
            "type": "string"
        },
        {
            "name": "keys",
            "type": "io.codebrews.schema.test.Keys"
        }
    ]
}

生产者.py

key_schema, value_schema = load_avro_schema_from_file('Subscription.avsc')

try:
    producer = avro.AvroProducer(producer_config, default_key_schema=key_schema, default_value_schema=value_schema)
except Exception as e:
    raise e

def load_avro_schema_from_file(schema_file):
    key_schema_string = """
    {"type": "string"}
    """

    key_schema = avro.loads(key_schema_string)
    value_schema = avro.load("./avro/" + schema_file)

    return key_schema, value_schema

当我尝试注册Keys.avsc时,它工作正常,没有错误。Test.avsc但是当我注册后尝试注册时Keys.avsc。我得到以下错误。

confluent_kafka.avro.error.ClientError:模式解析失败:未知命名模式'io.codebrews.schema.test.Keys',已知名称:['io.codebrews.schema.test.Subscription']。

手动注册架构后。

{
    "namespace": "io.codebrews.schema.test",
    "type": "record",
    "name": "Subscription",
    "fields": [
        {
            "name": "test",
            "type": "string"
        },
        {
            "name": "keys",
            "type": "Keys"
        }
    ]
}

在我的主题中推送消息时,出现以下错误。

ClientError: Incompatible Avro schema:409 message:{'error_code': 409, 'message': '正在注册的模式与主题“test-value”的早期模式不兼容。

我在这里做错了吗?

也有人可以帮助我如何在 python 中停止自动模式注册?

4

1 回答 1

0

该错误与解析器有关,而不是注册表注册...

您的 AVSC 文件需要完全包含所有记录类型。当解析器读取一个文件时,它无法了解其他文件

如果您从 AVDL 文件开始,然后将其转换为 AVSC,则记录会正确嵌入到外部记录中。

具体来说,

"fields" :[{
  "name": "keys",
  "type": {
       "type": "record", 
       "namespace": "io.codebrews.schema.test.Keys", 
       ... 
  } 
}
于 2021-09-24T02:51:05.910 回答