我有一些 Akka 1.3 代码处理与不直接支持回复概念的外部消息传递系统的集成。我正在尝试将代码库升级到 Akka 2.0 并且遇到了麻烦,因为我现有的设计依赖于通过将密钥生成到 ConcurrentHashMap 并使用 senderFuture 进行清理来模拟“回复”机制。
发送给正在处理调度的参与者的消息可能会或可能不会期待回复,即使他们期待回复,在通过网络发送后,他们也可能永远不会得到回复。因此,HashMap 应该只存储真正需要回复(询问,而不是告诉)的消息条目,并且它必须具有某种形式的清理机制来清除从未收到回复的条目。
在 Akka 1.3 中,我通过附加到 senderFuture 来做到这一点,所以当发送消息的参与者超时并放弃回复时,HashMap 中的相应条目也会被删除:
if (self.senderFuture().isDefined) {
pendingRequests.put(replyToKey, sender)
self.senderFuture().get.onComplete( f => {
pendingRequests.remove(replyToKey)
}
}
由于 Akka 2.0 已经移除了对 senderFuture 的可访问性,是否有一种干净的方式来处理这种情况?还是我只需要从头开始创建一个清理过程?