0

我有一个 SpringBoot 应用程序,其中有两个通过 Spring Cloud 映射的流处理器。每个处理器都有自己的 @StreamListener 用于不同的主题。一个处理器将聚合数据写入 quarable 状态存储。通过我的@Service(服务从状态存储获取聚合数据)获取数据时,我在单元测试中遇到了这个问题。由于某些原因不时捕获异常:

org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, recently-played-store, may have migrated to another instance. at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60) at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043) at org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry.getQueryableStoreType(QueryableStoreRegistry.java:47)

当我从另一个处理器中删除 StreamListener 时,一切正常且稳定。

如何将状态存储与正确的处理器绑定到确切的实例?

4

1 回答 1

0

我已经找到了解决我的问题的方法,也许它会帮助其他人。我使用的不是最新版本的 spring-cloud-stream-binder-kafka-streams。我的版本是 2.0.0,它有一个错误https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/366。仅在 2.0.1 版本中修复

于 2018-09-13T13:52:16.387 回答