2

我是 Akka 的新手,我想弄清楚它是否具有对企业集成模式 (EIP)的内置支持,或者我是否需要将这种类型的路由/集成委托给像Camel这样的框架。

在我的用例中,我有一个从源(文件)读取二进制样本的参与者;这个演员叫做SamplerSamplerthan 将实例Sample(消息)传递到一个名为SampleProcessors. 每个示例处理器对给定的Sample. 根据处理 a 的处理器的结果Sample,它可能需要路由到 1+ other SampleProcessor,或者可能所有处理都已结束。根据给SampleProcessor定的确切和确切性质SampleSample可能需要多播到其他接收者的列表SampleProcessors

这一切对我来说就像骆驼。

所以我问:

  • Akka 是否具有对路由、广播、多播和其他 EIP 的内置支持(如果有,它们是什么以及它们在哪里记录)?
  • 或者,我应该尝试将 Actor 系统与 Camel 集成,在这种情况下,它会是什么样子?我知道有一个 Camel-Akka 组件,但我相信这只是为了将 Camel 总线与演员系统集成(而我想要一个内部的服务总线我的演员系统中使用服务总线)
  • 或者,我应该在这里自己做 EIP/actor 布线吗?
4

1 回答 1

4

Akka 本身并不支持 EIP,但有几种方法可以实现它。

无论如何,如果你想要一些方便的 DSL,有一个比 EIP 更好的主意——就像GoF 模式map一样,你可以用函数组合 + Functors () 和 Monads ( )替换(实现)大多数 EIP 模式flatMap。换句话说,您可以将输入流视为无限集合。所以,

  • 处理器变成函数
  • 管道成为函子,比如val output1 = input.map(processor1).map(processor2)
  • 路由器和过滤器成为...... monads(filter基于flatMap):

    val fork1 = output1.filter(routingCondition1).map(...)

    val fork2 = output1.filter(routingCondition2).map(...)

  • 分裂是flatMapinput.flatMap(x => Stream(x.submsg1, x.submsg2))

  • 聚合器变成变态,又名fold(累加器通常应该由一些存储支持)

这种基于流的工作流已经为 Akka 实现了,它被称为Akka Streams,它是Reactive Streams的实现,另见这篇那篇文章。

另一种选择是按原样使用 Akka,actor 保证顺序处理,因此您可以通过创建 actor 链来实现管道:

class Processor1(next: ActorRef) extends Actor {
   def receive = {
      case x if filterCondition => 
      case x => next ! process(x)
   }
}

val processor2 = system.actorOf(Props[Processor2])
val processor1 = system.actorOf(Props[Processor1], processor2)

如果您需要路由 - 它只是两个“下一个”

class Router(next1: ActorRef, next2: ActorRef) extends Actor {
   def receive = {
      case x if filterCondition => 
      case x if cond1 => next1 ! process(x)
      case x if cond2 => next2 ! process(x)
   }
}

如果您需要保证路线之间没有比赛 - 请参阅此答案。当然,你放弃了整个 DSL 的想法,直接使用演员。

PS 是的,你仍然可以使用 Camel 作为端点——Akka 对此有一些支持。你可以使用 Akka 作为服务激活器。

于 2015-01-27T22:30:21.270 回答