0

我有一个测试,它在气质上留下了一个开放的生产者线程,并带有连续的错误日志记录。

[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

测试有效,但有时会像上面那样失败。

test("My test") {
  val topology = Application.getTopology(...)
  val streams = new KafkaStreams(topology,properties)

  withRunningKafka {
    createCustomTopic(eventTopic)
    val streamId = UUIDs.newUuid().toString
    logger.info(s"Creating stream with Application ID: [$streamId]")
    val streams = new KafkaStreams(topology, streamConfig(streamId, PropertiesConfig.asScalaMap(props)))

    try {
      publishToKafka(eventTopic, key = keyMSite1UID1, message = event11a)
      // ... several more publishings
      Thread.sleep(publishingDelay) // Give time to initialize
      streams.start()
      Thread.sleep(deletionDelay)

      withConsumer[MyKey, MyEvent, Unit] { consumer =>
        val consumedMessages: Stream[(MyKey, MyEvent)] =
          consumer.consumeLazily[(MyKey, MyEvent)](eventTopic)
        val messages = consumedMessages.take(20).toList
        messages.foreach(tuple => logger.info("EVENT   END: " + tuple))
        messages.size should be(6)
        // several assertions here
      }
    } finally {
      streams.close()
    }
  }(config)
}

一个特殊性是流应用程序在其消费的同一主题上产生删除事件。

该套件中有两个类似的测试。我在 sbt 下执行测试套件,如下所示:

testOnly *MyTest

五分之四的执行会留下一个悬空线程无限期地发布这些错误。他们以 3 人一组出现,但我也不知道为什么。

我尝试在调用 close() 后设置延迟,但似乎没有帮助。如何避免悬空的生产者线程?

4

1 回答 1

1

在您的测试中,您正在创建两个KafkaStreams实例,但您只有close()一个。我假设缺少Producer属于您不关闭的实例。KafkaStreams#close()请注意,即使您从未调用过,也需要调用KafkaStreams#start()

于 2018-06-11T03:08:31.943 回答