2

我已经在模式注册表中注册了一个模式,我可以register()像这样使用它,

from schema_registry.client import SchemaRegistryClient, schema

subject_name = "new-schema"
schema_url = "https://{{ schemaRegistry }}:8081" 
sr = SchemaRegistryClient(schema_url)

schema = schema.AvroSchema({
    "namespace": "example.avro",
    "type": "record",
    "name": "user",
    "fields": [
        {"name": "fname", "type": "string"},
        {"name": "favorite_number",  "type": "int"}
    ]
})

my_schema = sr.register(subject_name, schema)

现在我需要用一个新字段来更新这个相同的主题,所以我会得到新的模式 id 和version = 2.

updated_schema = schema.AvroSchema({
    "namespace": "example.avro",
    "type": "record",
    "name": "user",
    "fields": [
        {"name": "fname", "type": "string"},
        {"name": "favorite_number",  "type": "int"},
        {"name": "favorite_food",  "type": "string"}
    ]
})

我尝试使用sr.register(subject_name, updated_schema),它会为同一主题引发错误:

AttributeError: 'ClientError' object has no attribute '_get_object_id'
ClientError: Incompatible Avro schema

是的,此功能是注册新模式而不更新。我没有任何更新功能,我不知道该怎么做。那么如何更新架构?任何帮助,将不胜感激。

4

1 回答 1

3

当在主题中注册新模式时,模式注册表会强制执行某些兼容性规则。因此,您需要确保主题的兼容模式与您正在寻找的模式演变相匹配。


使用confluent-kafka-python

from confluent_kafka.schema_registry import SchemaRegistryClient


sr = SchemaRegistryClient("https://schema-registry-host:8081")

# Options are: 
#   - NONE, FULL, BACKWARD, FORWARD, 
#   - BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
sr.set_compatibility("yourSubjectName", "NONE")

使用python-schema-registry-client

from schema_registry.client import SchemaRegistryClient


sr = SchemaRegistryClient("https://schema-registry-host:8081")
sr.update_compatibility(level="NONE", subject="yourSubjectName")

有关兼容性类型的完整列表,请参阅Confluent 文档

于 2020-05-21T09:32:07.657 回答