我应该测试我的代码,以便通过嵌入式“withRunningKafka”使用来自 kafka-server 的所有消息,如下所示:https ://github.com/manub/scalatest-embedded-kafka
- 我试图通过创建的嵌入式生产者向主题发送消息。
- 而且我尝试通过项目中的代码使用生成的消息(由嵌入式生产者创建)。
“使用自定义生产者和消费者进行测试”应该{
"work" in {
withRunningKafka {
1. val producer: KafkaProducer[String, String] =
aKafkaProducer[String](valueSerializer, config)
val topic = "topic-to-test"
producer.send(new ProducerRecord[String, String](topic, "some message 1"))
producer.send(new ProducerRecord[String, String](topic, "some message 2"))
producer.close()
2. val ok: Future[Done] = Consumer
.committableSource(
consumerSettings,
Subscriptions.topics(topic))
.map(msg => println(msg.record.value()))
.runWith(Sink.ignore)
ok should be (Done)
}
}}
问题就在这里:'ok' 并没有给出'Done' 的结果。一般来说,我测试消费者的逻辑是否正确?