我正在使用 Akka(最新的稳定版本)akka-camel
和 JMS(出于本次对话的目的,假设它是 ActiveMQ,但理想情况下,解决方案应该是通用的)。
用例
我有以下用例。在队列中Q
,我收到如下消息:
time: 1 2 3 4 5 6
| A1 | B1 | C1 | C2 | A2 | B2 | ....
^ ^
first latest
我的最终目标是将它们配对成(A1,A2)
, (B1,B2)
, 等等; 尽管存在重复消息和未传递消息等复杂情况,但复杂之处在于我必须确保在匹配和处理整对消息之前,代理将保留所有未确认的消息。
例子
在4
我收到并处理了 4 条消息,并成功处理了 pair (C1, C2)
,但是我仍然无法向代理确认任何内容,因为A1
并且B1
仍然不匹配和待处理,并且在 JMS中确认C2
意味着确认所有消息直到C2
. 事实上,我可以发回的第一个确认是在 time 5
,whenA2
收到:此时我可以确认A1
(并且只有A1
, asB1
仍然未决)。
问题
现在,我似乎无法弄清楚如何通过. 我一直在网上阅读,虽然我可以找到有关如何手动确认消息的解释(文档和示例),但没有任何内容显示如何向代理确认先前处理的消息。akka-camel
import akka.camel.{ CamelMessage, Consumer }
import akka.camel.Ack
import akka.actor.Status.Failure
class Consumer3 extends Consumer {
override def autoAck = false
def endpointUri = "jms:queue:test"
def receive = {
case msg: CamelMessage =>
sender() ! Ack
// on success
// ..
val someException = new Exception("e1")
// on failure
sender() ! Failure(someException)
}
}
在这种情况下,Ack
is anobject
并且它的语义实际上只是:我确认当前消息,而我需要类似Message X is now acknowledge之类的东西,其中X是一些先前的消息,但不一定是当前消息。
这个用例是否支持或支持,akka-camel
或者我应该自己构建它?
谢谢