我正在使用reactor-kafka为 Kafka 消息创建一个反应式消费者,并且我正在寻找一种创建单元测试的方法。
例如,在以下消费者中,我想验证是否已执行提交调用。
import reactor.kafka.receiver.KafkaReceiver;
class ReactiveEventConsumer {
public ReactiveEventConsumer(KafkaReceiver<Integer, Object> kafkaReceiver) {
kafkaReceiver
.receive()
.doOnNext(record -> System.out.println("Kafka message received " + record))
.doOnNext(record -> record.receiverOffset().commit())
.subscribe();
}
}
我知道我可以设置 EmbeddedKafka 并根据代理属性创建 KafkaReceiver。这有点开销,因为我希望我的测试专注于反应部分而不是 kafka 部分。
我可以在没有代理的情况下创建 KafkaReceiver 吗?
还是有其他方法可以测试这种设置?