1

我有一个 SchemaRegistry 和一个 KafkaBroker,我使用 Avro v1.8.1 从中提取数据。对于反序列化,我一直在使用 Confluent 的KafkaAvroDeserializer。现在我打算重构我的代码以使用 Alpakka 提供的Elasticsearch API,但不幸的是,这会破坏反序列化,因为它会导致 NullPointerExceptions:

线程 "main" org.apache.kafka.common.errors.SerializationException 中的异常:在偏移量 0 处反序列化分区 topic-0 的键/值时出错。如果需要,请寻找过去的记录以继续消费。引起:org.apache.kafka.common.errors.SerializationException:为 id 2 反序列化 Avro 消息时出错 引起:io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) 处的 java.lang.NullPointerException io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) 在 org.apache.kafka.common.serialization.Deserializer.在 org.apache.kafka.clients.consumer 反序列化(Deserializer.java:58)。

我一直在使用 Alpakka 的 ConsumerSettings API,如本示例中所述:

val system = ActorSystem.create();

// necessary to convert timestamps correctly in Avro Version 1.8.1 to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());

val consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new KafkaAvroDeserializer())
    .withBootstrapServers(kafkaBootstrapServerUrl)
    .withClientId(InetAddress.getLocalHost().getHostName())
    .withGroupId("" + new Random().nextInt())
    .withProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
    .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    .withStopTimeout(Duration.ofSeconds(5));

这些设置会导致 NullPointerExceptions,而这个香草 Kafka Consumer 道具工作正常:

val props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); 
props.put(ConsumerConfig.GROUP_ID_CONFIG, "" + new Random().nextInt());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// necessary to convert timestamps correctly in newer Avro Versions and to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
val consumer = new KafkaConsumer<String, MyClass>(props);

在工作示例中,ConsumerRecords 的值成功反序列化为 AvroMavenPlugin 从模式生成的类。

任何提示表示赞赏!

4

1 回答 1

2

我认为您需要提取new KafkaAvroDeserializer()它自己的变量,然后调用该.configure()实例上的方法以传入非空注册表 URL。

然后将配置好的实例传入ConsumerSettings.create

FWIW,根据您的需要,Kafka Connect 可以很好地加载 Elasticsearch

于 2019-06-04T02:08:17.457 回答