我有一个场景,我正在使用 alpakka 启动多个 jmsSource(用于不同的队列)。我还需要在任何时间点分离队列。所以我在 jms akka 流中添加了 KillSwitch,如下所示:-
trait MessageListener {
lazy val jmsPipeline = jmsSource
.map { x => log.info(s"Received message ${x} from ${queue}"); x }
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach { x => pipelineActorRef ! PreProcessorMessage(x) })
(Keep.both)
.run()
def start(): Unit = {
log.info("Invoking listener : {}", queue)
jmsPipeline
log.info("listener : {} started", queue)
}
def stop():Unit = jmsPipeline._1.shutdown()
def queue: String
}
object ListenerA extends MessageListener {
override def queue: String = "Queue_A"
}
object ListenerB extends MessageListener {
override def queue: String = "Queue_B"
}
.. 等等。
启动应用程序后,所有队列都已连接并且工作正常。但是当我尝试使用 stop 方法分离队列时,并非所有队列都断开连接并且行为是随机的。我还检查了所有听众的 killSwitch 是否不同。
有人可以告诉我这里出了什么问题吗?