2

我有一些 Akka 1.3 代码处理与不直接支持回复概念的外部消息传递系统的集成。我正在尝试将代码库升级到 Akka 2.0 并且遇到了麻烦,因为我现有的设计依赖于通过将密钥生成到 ConcurrentHashMap 并使用 senderFuture 进行清理来模拟“回复”机制。

发送给正在处理调度的参与者的消息可能会或可能不会期待回复,即使他们期待回复,在通过网络发送后,他们也可能永远不会得到回复。因此,HashMap 应该只存储真正需要回复(询问,而不是告诉)的消息条目,并且它必须具有某种形式的清理机制来清除从未收到回复的条目。

在 Akka 1.3 中,我通过附加到 senderFuture 来做到这一点,所以当发送消息的参与者超时并放弃回复时,HashMap 中的相应条目也会被删除:

if (self.senderFuture().isDefined) {
  pendingRequests.put(replyToKey, sender)
  self.senderFuture().get.onComplete( f => {
    pendingRequests.remove(replyToKey)
  }
}

由于 Akka 2.0 已经移除了对 senderFuture 的可访问性,是否有一种干净的方式来处理这种情况?还是我只需要从头开始创建一个清理过程?

4

1 回答 1

2

Akka 2.x 中没有保留您正在寻找的精确功能,因为

  • 它使远程参与者通信变得脆弱,并且
  • 它在通信中打开了一个不明显的旁通道,最好是明确的。

使用senderFuture意味着您选择清理操作是客户端参与者的责任。您仍然可以通过简单地将此信息包含在您发送的消息中来做到这一点,而不是将其放入消息的发送方式中。

要考虑的另一件事是 2.0 中添加的DeathWatch功能:您可以发出需要清理的信号,然后目标可以作为请求的发送者,在收到相应消息context.watch()时进行清理(由发送者的死亡触发)。Terminated然后客户端仍然可以使用适当超时的询问模式,这会将定时的兴趣损失传达回服务参与者。

于 2013-01-08T13:08:14.120 回答