8

我有未知数量的工作要由已知(当然)数量的演员执行。演员完成工作后,初始工作数量可能会增加。也就是说,参与者在完成其任务后,可能会添加要执行的新作业。

我处理这个问题的方法是让每个参与者在完成其工作时向主节点发送一条消息,不仅包含执行结果,还包含一个指示参与者现在空闲的“标志”。master 有一个作业队列和一个空闲 actor 队列,每当一个 actor 发送“作业完成消息”时,master 将检查该 actor 是否还有其他事情要做……依此类推,直到作业队列是空的,空闲队列已满……那时我关闭了系统。这里没有太多的监督,所以我觉得我做得不好......

我没有使用路由器,因为我找不到向路由器查询空闲演员的方法,所以我的问题是:

处理我上面在 Akka 中描述的情况的“正确”方法是什么?

4

3 回答 3

8

你应该看看Akka 的路由功能。 SmallestMailboxRouter可能是您正在寻找的。

作为替代方案,您可以按需创建角色,即对于每个任务,动态创建一个新角色。中心参与者跟踪当前活动的所有参与者。一旦一个工作角色完成,它会向自己发送一个 aPoisonPill并通知主节点它的关闭(主动地,或者通过TerminateAkka 将发送给监督角色的标准消息)。一旦没有更多的活动参与者,即不再有任务,控制参与者就会关闭系统。

阅读评论后的补充: 看看 的来源SmallestMailboxLike,一个 Scala trait 混合在SmallestMailboxRouter. 警告:您应该具备 Scala 的基本知识。但是如果你想使用 Akka,这通常是一个好主意......方法isProcessingMessage(ActorRef)可以理解为isNotIdle(ActorRef)

// Returns true if the actor is currently processing a message.
// It will always return false for remote actors.
// Method is exposed to subclasses to be able to implement custom
// routers based on mailbox and actor internal state.
protected def isProcessingMessage(a: ActorRef): Boolean = a match {
  case x: LocalActorRef ?
    val cell = x.underlying
    cell.mailbox.isScheduled && cell.currentMessage != null
  case _ ? false
}

// Returns true if the actor currently has any pending messages
// in the mailbox, i.e. the mailbox is not empty.
// It will always return false for remote actors.
// Method is exposed to subclasses to be able to implement custom
// routers based on mailbox and actor internal state.
protected def hasMessages(a: ActorRef): Boolean = a match {
  case x: LocalActorRef ? x.underlying.mailbox.hasMessages
  case _                ? false
}
于 2012-07-07T19:36:06.377 回答
1

另一种策略可以是使用 BalancingDispatcher 和 RoundRobinRouter(作为演员“池”)。来自 Akka 文档:

BalancingDispatcher
# This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors.




# All the actors share a single Mailbox that they get their messages from.

It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.

# Sharability: Actors of the same type only

# Mailboxes: Any, creates one for all Actors

# Use cases: Work-sharing

在 application.conf 中定义您的调度程序或在启动时以编程方式加载它。

private final static Config akkaConfig = ConfigFactory.parseString(

            "my-dispatcher.type = BalancingDispatcher \n" +
            "my-dispatcher.executor = fork-join-executor \n" +
            "my-dispatcher.fork-join-executor.parallelism-min = 8 \n" +
            "my-dispatcher.fork-join-executor.parallelism-factor = 3.0 \n" +
            "my-dispatcher.fork-join-executor.parallelism-max = 64 "
);

然后为路由定义路由器和调度程序。

getContext().actorOf(new Props(MyActor.class).withRouter(new RoundRobinRouter(10)).withDispatcher("my-dispatcher"), "myActor");

所以路由器将简单地继续“分发”消息,调度程序将运行一个选定的参与者(它也实现了工作窃取)

于 2012-07-27T15:14:00.983 回答
-1

Balancing Dispatcher 将只为所有使用 BalancingDispatcher 创建的所有创建的 Actor 使用一个邮箱。所以它会让你的工作变得简单。

于 2017-07-27T05:30:10.167 回答