2

免责声明:我对 KafkaStreams 的体验非常有限。当我所做的只是将主题流式传输到 KTable 中以便以后可以在该商店上使用交互式查询时,org.apache.kafka.common.errors.SerializationException: Error registering Avro schema:我不太明白为什么会出现错误。Schema being registered is incompatible with an earlier schema;

这是 SerDes 配置。

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);

Streams 代码:请注意,此时我不希望对 Stream 进行任何过滤或分组,我只希望数据可用于将来通过 Store 查询。

final KStream<String,GenericRecord> stream = builder.stream("my-topic");
stream.toTable(Materialized.as("my-state-store"));

每当我在序列化时streams.start()不断收到这些异常。我有一个架构,所以我使用了SpecificAvroSerdes,但问题是一样的。我想我对为什么我的 KTable 尝试向 Confluent 注册新模式缺少一些基本的理解。

编辑 1: 我现在了解架构寄存器在这里所​​扮演的角色。将 KStream 与 GenericAvroSerde 一起使用,我可以使用来自主题的数据,但仍然无法在 KTable 中实现它。我现在的问题是:

  1. 为什么我总是在同一个分区和偏移量中得到上述异常,即使我没有调用streams.cleanUp(). 为什么它不继续(承诺)。
  2. 这个异常似乎是不可恢复的。所有 Streams 线程都死了,导致应用程序关闭。有没有办法绕过这个?注意:我已经在使用LogAndContinue异常处理程序进行生产和反序列化。

编辑2:

我能够克服这个例外。我的 StateStore 包含具有不兼容架构的先前条目。在我清除主题并更改 ApplicationId 后,它开始工作。

尽管如此,这仍然不能否定捕获Schema being registered is incompatible with an earlier schema;异常的需要。这使 Streams 应用程序停止运行。我尝试使用streams.setUncaughtExceptionHandler可以记录错误的位置,但这并不能阻止 Streams 线程死亡,之后我什至无法启动它们。肯定有办法解决这个问题吗?

4

1 回答 1

0

您描述的异常是致命的,您无法阻止线程在 atm 中死亡,因为线程无法取得任何进展并因此放弃。只有在您解决了问题后,您才能重新启动应用程序(如您所见)。

Btw:你为什么不简单地用builder.table()阅读主题?

于 2020-06-03T01:23:16.910 回答