0

我的简化Akka Camel应用程序设置如下:

AppleProducer -> seda:appleRoute -> AppleConsumer

OrangeProducer -> seda:orangeRoute -> OrangeConsumer

我看到的是,Apple事件间歇性地被 . 所消耗OrangeConsumer,反之亦然。

在下面运行这个示例(可能几次)会重新创建它。

我不明白这只是间歇性地发生。我究竟做错了什么?

object TestApp extends App {
  implicit val system = ActorSystem()
  val camel = CamelExtension(system)
  val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer")
  system.actorOf(Props(classOf[MyAppleConsumer], "seda:appleRoute"), "AppleConsumer")
  val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer")
  system.actorOf(Props(classOf[MyOrangeConsumer], "seda:orangeRoute"), "OrangeConsumer")

  appleProducer ! new Apple("1")
  orangeProducer ! new Orange("1")
  appleProducer ! new Apple("2")
  orangeProducer ! new Orange("2")
  appleProducer ! new Apple("3")
  orangeProducer ! new Orange("3")
  appleProducer ! new Apple("4")
  orangeProducer ! new Orange("4")
  appleProducer ! new Apple("5")
  orangeProducer ! new Orange("5")
  appleProducer ! new Apple("6")
  orangeProducer ! new Orange("6")

}

class MyProducer(route: String) extends Actor with ActorLogging  {

  def receive = {
    case payload: Any =>
      val template = CamelExtension(context.system).template
      template.setDefaultEndpointUri(route)
      template.sendBody(payload)
  }
}

class MyAppleConsumer(route: String) extends Consumer with ActorLogging {
  override def endpointUri: String = route

  override def receive: Receive = {
    case event: CamelMessage if event.body.isInstanceOf[Apple] =>
      log.info("Received event {}", event.body)
    case _ => throw new IllegalArgumentException("Invalid entity")
  }
}

class MyOrangeConsumer(route: String) extends Consumer with ActorLogging {
  override def endpointUri: String = route

  override def receive: Receive = {
    case event: CamelMessage if event.body.isInstanceOf[Orange] =>
      log.info("Received event {}", event.body)
    case _ => throw new IllegalArgumentException("Invalid entity")
  }
}

class Apple(id: String)
class Orange(id: String)
4

2 回答 2

0

我想我最终设法弄清楚了这一点。

这个问题与 SEDA 无关。相反,似乎DefaultProducerTemplate为多个MyProducer实例返回了相同的结果。

因此,在设置defaultEndpointUri

对我来说,解决方案是只创建一个参与者实例,MyProducer以确保我们不会遇到这种竞争条件

于 2016-06-03T21:29:08.217 回答
0

我建议扩展 trait ,而不是像使用and一样Producer使用模板。MyProducerConsumerMyAppleConsumerMyOrangeConsumer

class MyProducer(route: String) extends Producer with OneWay  {
  def endpointUri = route 
}

更多信息可以在这里找到:http: //doc.akka.io/docs/akka/snapshot/scala/camel.html

我相信您应该能够像这样简化代码(免责声明:未编译或测试!):

case class Apple(id: String)
case class Orange(id: String)

object TestApp extends App {
  implicit val system = ActorSystem()

  val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer")
  system.actorOf(Props(classOf[MyConsumer], "seda:appleRoute"), "AppleConsumer")
  val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer")
  system.actorOf(Props(classOf[MyConsumer], "seda:orangeRoute"), "OrangeConsumer")

  appleProducer ! Apple("1")
  orangeProducer ! Orange("1")
  appleProducer ! Apple("2")
  orangeProducer ! Orange("2")
  appleProducer ! Apple("3")
  orangeProducer ! Orange("3")
  appleProducer ! Apple("4")
  orangeProducer ! Orange("4")
  appleProducer ! Apple("5")
  orangeProducer ! Orange("5")
  appleProducer ! Apple("6")
  orangeProducer ! Orange("6")

}

class MyProducer(route: String) extends Producer with OneWay with ActorLogging  {
  def endpointUri = route
}

class MyConsumer(route: String) extends Consumer with ActorLogging {
  override def endpointUri: String = route

  override def receive: Receive = {
    case CamelMessage(body : Apple, headers) =>
      log.info("Received event {}", body)
    case CamelMessage(body : Orange, headers) =>
      log.info("Received event {}", body)
    case _ => throw new IllegalArgumentException("Invalid entity")
  }
}
于 2016-06-05T19:20:11.210 回答