1

我有一个场景,我正在使用 alpakka 启动多个 jmsSource(用于不同的队列)。我还需要在任何时间点分离队列。所以我在 jms akka 流中添加了 KillSwitch,如下所示:-

trait MessageListener  {

  lazy val jmsPipeline = jmsSource
    .map { x => log.info(s"Received message ${x} from ${queue}"); x }
    .viaMat(KillSwitches.single)(Keep.right)
    .toMat(Sink.foreach { x => pipelineActorRef ! PreProcessorMessage(x) })
    (Keep.both)
    .run()

   def start(): Unit = {
             log.info("Invoking listener : {}", queue)
             jmsPipeline
             log.info("listener : {} started", queue)
          }
  def stop():Unit  =     jmsPipeline._1.shutdown()

  def queue: String

}

object ListenerA extends MessageListener {
  override def queue: String = "Queue_A"
}

object ListenerB extends MessageListener {
  override def queue: String = "Queue_B"
} 

.. 等等。

启动应用程序后,所有队列都已连接并且工作正常。但是当我尝试使用 stop 方法分离队列时,并非所有队列都断开连接并且行为是随机的。我还检查了所有听众的 killSwitch 是否不同。

有人可以告诉我这里出了什么问题吗?

4

1 回答 1

0

您的日志支持您连接到具有不同流的多个队列的错觉,但您有多个流可能连接到同一个队列。在您的两个侦听器对象中,记录器都会记录覆盖的queue名称,但此队列名称不用于配置jmsSource

您没有显示jmsSource;的定义 显然它是在MessageListener特征之外的某个地方定义的,在这种情况下,两者ListenerAListenerB都使用相同的jmsSource. 换句话说,虽然ListenerAListenerB具有不同的实例jmsPipeline(这就是终止开关不同的原因),但这两个jmsPipeline实例都是从同一个jmsSource实例派生的(除非jmsSourcea在每次调用时def创建不同的实例Source,但即使是这种情况,基本问题依然存在:queue没有在配置中使用)。

在 Alpakka 中,JMS 队列配置在 上JmsSourceSettings,因此jmsSource可能如下所示:

val jmsSource: Source[String, NotUsed] = JmsSource.textSource(
  JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("MyQueue")
)                        // the queue is configured here ^

ListenerA.start()例如,当调用 时,将记录以下内容:

Invoking listener : Queue_A
listener : Queue_A started

同样,"Queue_A"在上面的日志语句中是被覆盖的def queue: String成员的值ListenerA;它不一定是实际配置的队列jmsSource"MyQueue"在上面的示例中)。ListenerB与您在组合器中登录的消息相同map

一个简单的解决方法是将trait的定义jmsSource及其JmsSourceSettings内部移动并在这些设置中实际使用。MessageListenerqueue

于 2017-12-27T21:28:00.047 回答