我正在尝试使用 Hortonworks Schema Registry 反序列化一些由 Nifi 序列化的 Kafka 消息
- 在 Nifi 端用作 RecordWritter 的处理器:AvroRecordSetWriter
- Schema 写入策略:HWX CContent-Encoded Schema Reference
我能够在其他 Nifi kafka 消费者中反序列化这些消息。但是我正在尝试使用 Kafka 代码从我的 Flink 应用程序中反序列化它们。
我的 Flink 应用程序的 Kafka 反序列化处理程序中有以下内容:
final String SCHEMA_REGISTRY_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_URL_KEY = SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name();
Properties schemaRegistryProperties = new Properties();
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_SIZE_KEY, 10L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY, 5000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY, 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY, 60 * 60 * 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_URL_KEY, "http://schema_registry_server:7788/api/v1");
return (Map<String, Object>) HWXSchemaRegistry.getInstance(schemaRegistryProperties).deserialize(message);
这是反序列化消息的 HWXSchemaRegistryCode:
import com.hortonworks.registries.schemaregistry.avro.AvroSchemaProvider;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer;
public class HWXSchemaRegistry {
private SchemaRegistryClient client;
private Map<String,Object> config;
private AvroSnapshotDeserializer deserializer;
private static HWXSchemaRegistry hwxSRInstance = null;
public static HWXSchemaRegistry getInstance(Properties schemaRegistryConfig) {
if(hwxSRInstance == null)
hwxSRInstance = new HWXSchemaRegistry(schemaRegistryConfig);
return hwxSRInstance;
}
public Object deserialize(byte[] message) throws IOException {
Object o = hwxSRInstance.deserializer.deserialize(new ByteArrayInputStream(message), null);
return o;
}
private static Map<String,Object> properties2Map(Properties config) {
Enumeration<Object> keys = config.keys();
Map<String, Object> configMap = new HashMap<String,Object>();
while (keys.hasMoreElements()) {
Object key = (Object) keys.nextElement();
configMap.put(key.toString(), config.get(key));
}
return configMap;
}
private HWXSchemaRegistry(Properties schemaRegistryConfig) {
_log.debug("Init SchemaRegistry Client");
this.config = HWXSchemaRegistry.properties2Map(schemaRegistryConfig);
this.client = new SchemaRegistryClient(this.config);
this.deserializer = this.client.getDefaultDeserializer(AvroSchemaProvider.TYPE);
this.deserializer.init(this.config);
}
}
但我得到一个 404 HTTP 错误代码(找不到模式)。我认为这是由于 Nifi 配置和 HWX Schema Registry Client 实现之间的“协议”不兼容,因此客户端正在寻找的模式标识符字节在服务器上不存在,或者类似的东西。
有人可以帮忙吗?
谢谢你。
引起:javax.ws.rs.NotFoundException: HTTP 404 Not Found at org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:1069) at org.glassfish.jersey.client.JerseyInvocation.translate(JerseyInvocation.java :866) 在 org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:750) 在 org.glassfish.jersey.internal.Errors.process(Errors.java:292) 在 org.glassfish.jersey .internal.Errors.process(Errors.java:274) 在 org.glassfish.jersey.internal.Errors.process(Errors.java:205) 在 org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java :390) 在 org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:748) 在 org.glassfish.jersey.client.JerseyInvocation$Builder。方法(JerseyInvocation.java:404)在 org.glassfish.jersey.client.JerseyInvocation$Builder.get(JerseyInvocation.java:300) 在 com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient$14.run(SchemaRegistryClient.java:1054 ) 在 com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient$14.run(SchemaRegistryClient.java:1051) 在 java.security.AccessController.doPrivileged(Native Method) 在 javax.security.auth.Subject.doAs(Subject.java: 360) 在 com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getEntities(SchemaRegistryClient.java:1051) 在 com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getAllVersions(SchemaRegistryClient.java:872) 在 com.hortonworks.registries .schemaregistry.client.SchemaRegistryClient。getAllVersions(SchemaRegistryClient.java:676) 在 HWXSchemaRegistry.(HWXSchemaRegistry.java:56) 在 HWXSchemaRegistry.getInstance(HWXSchemaRegistry.java:26) 在 SchemaService.deserialize(SchemaService.java:70) 在 SchemaService.deserialize(SchemaService.java:26) ) 在 org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) 的 org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) ) 在 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) 在 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:712) 在 org. apache.flink.streaming.api.operators.StreamSource。在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask. java:302) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) 在 java.lang.Thread.run(Thread.java:745)