我有以下流:
Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(bsAddress, bsPort)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
它可以正常工作一段时间,我可以使用填充在 Kafka 主题上的消息。但有时,显然是在一个随机的时间间隔,没有更多的消息发布,并且这段代码没有记录任何错误(printAndByeBye 将打印传递的消息并终止actor系统。)重新启动应用程序后,消息继续流动。
关于如何知道这里发生了什么的任何想法?
编辑:我把 Kamon 放在上面,我可以看到以下行为:
看起来有些东西在没有通知流应该停止的情况下停止了,但我不知道如何使其明确并停止流。