3

我正在尝试在 akka 演员之间建立一个消息传递过程,以代表主人给工人一份工作,并密切关注它。我的问题是

  1. 是我在合理的方法下提出的建议吗?
  2. 即使不是,为了我的未来教育,我想知道如何通过 Futures 的组合来正确完成它。

我想要的过程是这样的

1) Master 用ask. 它希望在 5 秒内得到答复,否则它认为工人已经失去了机会,它将不得不再次进入投标。

import context.dispatcher
implicit val timeout = Timeout(5 seconds)
val workCompletedFuture = (worker ? WorkTicket(work)).mapTo[Future[WorkCompleted]]

2a)如果工人在 5 秒内没有响应,我希望主人向自己发送一条消息,说重新分配工作。

self ! WorkAllocationFailed(work, worker)

2b)如果工人确实做出了回应,那么它会给我们一个 Future[WorkCompleted]。我想等待那个未来完成,比如说,2 分钟。

3a)如果 Future[WorkCompleted] 未能在超时时间内完成,则重新分配工作

self ! WorkFailed(work, worker)

3b)如果 Future[WorkCompleted] 成功则收集结果

我已经尝试过创建这个逻辑,但是我把嵌套搞得一团糟onComplete,而且我不知道如何在 Future[WorkCompleted] 上进行超时。我尝试阅读Akka 2.10 Futures 文档,但找不到解决方案。

4

2 回答 2

2

一般的想法是,你有一个将工作交给一群工人的主人是一个合理的模式。

另一方面,当系统的所有部分都已经是参与者时,我不建议使用 Futures。无需使用 ask 提交作品,您只需通过 tell 发送即可。然后,master 可以定期检查超时的作业并再次重新提交它们。

此外,在 actor 的主体中调用 onComplete 非常危险,因为它在可能不同的线程上执行。与参与者交流的安全方式是通过消息传递。如果你有一个 Future 并且希望在 Future 完成后在 Actor 中做一些事情,那么最好使用管道模式。

您的代码段中还有一个小错误。如果您的工人演员回复 WorkCompleted,那么这就是您真正想要的行:

val workCompletedFuture = (worker ? WorkTicket(work)).mapTo[WorkCompleted]
于 2013-01-14T12:30:14.193 回答
2

我同意 Endre 的回答 - 都是非常好的观点。

这个怎么样:

1) 为自己安排一条超时消息(使用system.scheduler.scheduleOnce

2)使用常规向工人发送工作消息tell

3a) 如果完成的工作在超时消息之前返回,取消计划的工作并使用步骤 1 和 2 重新分配工作

3b) 如果完成的工作在超时消息后返回,则忽略它或取消重新分配的工作。

未来可能对工人有用的一个地方是,特别是如果工作需要很长时间或正在阻塞。工作人员可以使用未来来完成工作并保持可用以处理更多传入消息,例如取消工作。

于 2013-01-14T13:51:45.037 回答