我有一个函数,它接受一个 Kafka 流的实例,获取状态存储,解析它并进行一些计算。
void func1(KafkaStreams streams)
{
StoreQueryParameters<ReadOnlyKeyValueStore<String, Long>> storeQueryParams =
StoreQueryParameters.fromNameAndType(...);
ReadOnlyKeyValueStore<String, Long> stateStore = streams.store(storeQueryParams);
KeyValueIterator<String, Long> range = stateStore.all();
...
// using this iterator, I will read each record in state store and do some computation.
}
让我们假设 Kafka 流的拓扑是一个简单的拓扑,我们从一个主题中读取并将确切的记录存储在状态存储中。
如何测试这些需要 Kafka 设置的功能?