1

我应该测试我的代码,以便通过嵌入式“withRunningKafka”使用来自 kafka-server 的所有消息,如下所示:https ://github.com/manub/scalatest-embedded-kafka

  1. 我试图通过创建的嵌入式生产者向主题发送消息。
  2. 而且我尝试通过项目中的代码使用生成的消息(由嵌入式生产者创建)。

“使用自定义生产者和消费者进行测试”应该{

"work" in {

    withRunningKafka {

      1. val producer: KafkaProducer[String, String] =
               aKafkaProducer[String](valueSerializer, config)

         val topic = "topic-to-test"

         producer.send(new ProducerRecord[String, String](topic, "some message 1"))
         producer.send(new ProducerRecord[String, String](topic, "some message 2"))
         producer.close()

      2. val ok: Future[Done] = Consumer
        .committableSource(
            consumerSettings,
            Subscriptions.topics(topic))
        .map(msg => println(msg.record.value()))
        .runWith(Sink.ignore)

       ok should be (Done)
    }
}}

问题就在这里:'ok' 并没有给出'Done' 的结果。一般来说,我测试消费者的逻辑是否正确?

4

2 回答 2

1

欢迎来到堆栈溢出!

原因ok永远不会以结果完成,因为源正在等待可能的进一步消息。在 map 之前添加.take(2),源将在两个元素之后停止,让okfuture 完成。

于 2019-01-31T06:47:41.293 回答
0

我认为您同时面临两个问题:

  1. 卡夫卡消费者无限等待元素(如@dvim所说),所以你需要 .take() 让它真正结束

  2. 默认情况下,kafka 消费者组将从当前主题的末尾开始,而不是开始,因此不会消耗在旋转之前发布的消息。您需要在设置中使其从主题的开头而不是结尾开始。

于 2019-01-31T08:37:15.393 回答