0

我正在使用 Spring kafka(1.3.2.RELEASE)、apache avro(1.8.2) 和 io.confluent 的 Schema Registry(3.1.2) 开发 Spring Boot 服务器。因此,即使 kafka 监听器收到一条 kafka 消息,它也会在消息中找到模式 id,并通过 id 从注册服务器获取 avro 模式。问题是,如果方案注册表配置服务器关闭,我的侦听器将在收到消息时继续尝试向注册表服务器发送 http 请求以获取 avro 模式(也会打印大量错误日志),它会阻塞由于偏移量不会继续,所有下一个 kafka 消息。

16:56:41.541 ERROR KafkaMessageListenerContainer$ListenerConsumer -  - org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 - Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition trade-0 at offset 810845
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 21
Caused by: java.net.ConnectException: Connection refused (Connection refused)
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at java.net.Socket.connect(Socket.java:538)
        at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
        at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
        at sun.net.www.http.HttpClient.New(HttpClient.java:339)
        at sun.net.www.http.HttpClient.New(HttpClient.java:357)
        at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1202)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1138)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1032)
        at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:966)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1546)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1474)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:153)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:323)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:316)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:63)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndID(CachedSchemaRegistryClient.java:118)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
        at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:918)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1095)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:944)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:567)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:614)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)

我曾尝试使用 RetryTemplate 设置最大尝试次数,但没有成功,看来 RetryTemplate 可能仅适用于我的侦听器方法。此外,我在 io confluent 的网站上也没有找到任何有用的配置。

4

1 回答 1

1

现在我使用 CustomAvroDeserializer 替换 KafkaAvroDeserializer,它扩展了 KafkaAvroDeserializer 并通过在其内容中添加 try-catch 来覆盖其反序列化方法,如下所示:

@Log4j
public class CustomAvroDeserializer extends KafkaAvroDeserializer {

  @Override
  public Object deserialize(String s, byte[] bytes) {
    try {
      return this.deserialize(bytes);
    } catch (Exception e) {
      log.error("encounter a problem when deserializer message with schema registry:{}", e);
      return null;
    }
  }
}
于 2018-04-19T03:39:37.253 回答