Akka 本身并不支持 EIP,但有几种方法可以实现它。
无论如何,如果你想要一些方便的 DSL,有一个比 EIP 更好的主意——就像GoF 模式map
一样,你可以用函数组合 + Functors () 和 Monads ( )替换(实现)大多数 EIP 模式flatMap
。换句话说,您可以将输入流视为无限集合。所以,
- 处理器变成函数
- 管道成为函子,比如
val output1 = input.map(processor1).map(processor2)
路由器和过滤器成为...... monads(filter
基于flatMap
):
val fork1 = output1.filter(routingCondition1).map(...)
val fork2 = output1.filter(routingCondition2).map(...)
分裂是flatMap
:input.flatMap(x => Stream(x.submsg1, x.submsg2))
- 聚合器变成变态,又名
fold
(累加器通常应该由一些存储支持)
这种基于流的工作流已经为 Akka 实现了,它被称为Akka Streams,它是Reactive Streams的实现,另见这篇
和那篇文章。
另一种选择是按原样使用 Akka,actor 保证顺序处理,因此您可以通过创建 actor 链来实现管道:
class Processor1(next: ActorRef) extends Actor {
def receive = {
case x if filterCondition =>
case x => next ! process(x)
}
}
val processor2 = system.actorOf(Props[Processor2])
val processor1 = system.actorOf(Props[Processor1], processor2)
如果您需要路由 - 它只是两个“下一个”
class Router(next1: ActorRef, next2: ActorRef) extends Actor {
def receive = {
case x if filterCondition =>
case x if cond1 => next1 ! process(x)
case x if cond2 => next2 ! process(x)
}
}
如果您需要保证路线之间没有比赛 - 请参阅此答案。当然,你放弃了整个 DSL 的想法,直接使用演员。
PS 是的,你仍然可以使用 Camel 作为端点——Akka 对此有一些支持。你可以使用 Akka 作为服务激活器。