0

我有一个函数,它接受一个 Kafka 流的实例,获取状态存储,解析它并进行一些计算。

void func1(KafkaStreams streams)
{
   StoreQueryParameters<ReadOnlyKeyValueStore<String, Long>> storeQueryParams = 
                StoreQueryParameters.fromNameAndType(...);
   ReadOnlyKeyValueStore<String, Long> stateStore = streams.store(storeQueryParams);
   KeyValueIterator<String, Long> range = stateStore.all();
   ...
   // using this iterator, I will read each record in state store and do some computation.
}

让我们假设 Kafka 流的拓扑是一个简单的拓扑,我们从一个主题中读取并将确切的记录存储在状态存储中。

如何测试这些需要 Kafka 设置的功能?

4

1 回答 1

4

您可以尝试直接测试商店,通过TopologyTestDriver

这是一个例子:

https://github.com/openzipkin-contrib/zipkin-storage-kafka/blob/56afb2e7a0bd4381cab9c97002018d301c331b29/storage/src/test/java/zipkin2/storage/kafka/streams/TraceStorageTopologyTest.java#L186-L200

如果要测试func1访问 KafkaStreams 实例的方法,则需要进行集成测试。TestContainers 可以帮助为您的测试提供一个 Kafka 集群https://www.testcontainers.org/modules/kafka/

于 2020-10-15T10:16:42.330 回答