我有一个 SpringBoot 应用程序,其中有两个通过 Spring Cloud 映射的流处理器。每个处理器都有自己的 @StreamListener 用于不同的主题。一个处理器将聚合数据写入 quarable 状态存储。通过我的@Service(服务从状态存储获取聚合数据)获取数据时,我在单元测试中遇到了这个问题。由于某些原因不时捕获异常:
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, recently-played-store, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
at org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry.getQueryableStoreType(QueryableStoreRegistry.java:47)
当我从另一个处理器中删除 StreamListener 时,一切正常且稳定。
如何将状态存储与正确的处理器绑定到确切的实例?