我们有一个与 Kafka 集成的 spark 流应用程序,我正在尝试对其进行优化,因为它会过度调用 Schema Registry 来下载模式。
我们的数据的 avro 模式很少改变,目前我们的应用程序只要有记录进入就会调用模式注册表,这太过分了。
我从 confluent 遇到了CachedSchemaRegistryClient,它看起来很有希望。虽然在研究了它的实现之后,我不确定如何使用它的内置缓存来减少对 Schema Registry 的 REST 调用。
上面的链接会将您带到源代码,这里我将粘贴与将模式附加到 CachedSchemaRegistryClient 的缓存有关的唯一方法。
public synchronized int register(String subject, Schema schema) throws IOException, RestClientException
{
Object schemaIdMap;
if(this.schemaCache.containsKey(subject)) {
schemaIdMap = (Map)this.schemaCache.get(subject);
} else {
schemaIdMap = new HashMap();
this.schemaCache.put(subject, (Map)schemaIdMap);
}
/*
* let's call the above as the FIRST part of this method, below as the SECOND part
*/
if(((Map)schemaIdMap).containsKey(schema)) {
return ((Integer)((Map)schemaIdMap).get(schema)).intValue();
} else if(((Map)schemaIdMap).size() >= this.identityMapCapacity) {
throw new IllegalStateException("Too many schema objects created for " + subject + "!");
} else {
int id = this.registerAndGetId(subject, schema);
((Map)schemaIdMap).put(schema, Integer.valueOf(id));
return id;
}
}
该方法的目的是向 Schema Registry 以及本地缓存注册一个 schema,并返回其 schemaID;如果架构已在本地存在,则返回 schemaID。如果我们正在注册一个完整的新模式,这将非常有效。
但是在架构已经在架构注册表中注册的情况下(在我们的情况下由另一个应用程序注册),我们只想将架构放在 CachedSchemaRegistryClient 的本地缓存中以便于快速访问 - 我个人认为不支持到今天为止,是否有没有自定义的干净解决方法?
我们考虑过自己维护一个本地缓存,但如果 confluent 可以提供一些东西,我们希望将其保留为最后的手段。
任何建议/想法表示赞赏,在此先感谢。