1

我有一个演员(工人),它基本上向其他 3 个演员(Filter1、Filter2 和 Filter3)询问结果。如果其中任何一个返回 false,则无需等待其他的,例如对结果进行“与”操作。当收到错误响应时,会向参与者发送取消消息,以取消排队的工作并使其在执行中更有效。

过滤器不是 Worker 的子级,但有一个公共的 Actor 池,所有 Worker Actor 都使用它。我使用代理来维护取消作品的收集。然后,在处理特定工作之前,如果该工作被取消,我会检查取消代理,然后避免执行它。Cancel 的优先级高于 Work,因此,它总是首先被处理。

代码是这样的

创建演员树的代理:

import scala.collection.mutable.HashSet
import scala.concurrent.ExecutionContext.Implicits.global

import com.typesafe.config.Config

import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.PoisonPill
import akka.actor.Props
import akka.agent.Agent
import akka.routing.RoundRobinRouter

class Proxy extends Actor with ActorLogging {
  val agent1 = Agent(new HashSet[Work])
  val agent2 = Agent(new HashSet[Work])
  val agent3 = Agent(new HashSet[Work])

  val filter1 = context.actorOf(Props(Filter1(agent1)).withDispatcher("priorityMailBox-dispatcher")
    .withRouter(RoundRobinRouter(24)), "filter1")
  val filter2 = context.actorOf(Props(Filter2(agent2)).withDispatcher("priorityMailBox-dispatcher")
    .withRouter(RoundRobinRouter(24)), "filter2")
  val filter3 = context.actorOf(Props(Filter3(agent3)).withDispatcher("priorityMailBox-dispatcher")
    .withRouter(RoundRobinRouter(24)), "filter3")

  //val workerRouter = context.actorOf(Props[SerialWorker].withRouter(RoundRobinRouter(24)), name = "workerRouter")

  val workerRouter = context.actorOf(Props(new Worker(filter1, filter2, filter3)).withRouter(RoundRobinRouter(24)), name = "workerRouter")

  def receive = {
    case w: Work =>
      workerRouter forward w
  }

}

工人:

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.pattern.ask
import akka.pattern.pipe
import akka.util.Timeout
import akka.actor.ActorRef
import akka.routing.RoundRobinRouter
import akka.agent.Agent
import scala.collection.mutable.HashSet

class Worker(filter1: ActorRef, filter2: ActorRef, filter3: ActorRef) extends Actor with ActorLogging {

  implicit val timeout = Timeout(30.seconds)

  def receive = {
    case w:Work =>

      val start = System.currentTimeMillis();

      val futureF3 = (filter3 ? w).mapTo[Response]
      val futureF2 = (filter2 ? w).mapTo[Response]
      val futureF1 = (filter1 ? w).mapTo[Response]

      val aggResult = Future.find(List(futureF3, futureF2, futureF1)) { res => !res.reponse }

      Await.result(aggResult, timeout.duration) match {
        case None =>
          Nqueen.fact(10500000L)
          log.info(s"[${w.message}] Procesado mensaje TRUE en ${System.currentTimeMillis() - start} ms");
          sender ! WorkResponse(w, true)

        case _ =>
          filter1 ! Cancel(w)
          filter2 ! Cancel(w)
          filter3 ! Cancel(w)
          log.info(s"[${w.message}] Procesado mensaje FALSE en ${System.currentTimeMillis() - start} ms");
          sender ! WorkResponse(w, false)
      }
  }
}

和过滤器:

import scala.collection.mutable.HashSet
import scala.util.Random

import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.actorRef2Scala
import akka.agent.Agent

trait CancellableFilter { this: Actor with ActorLogging =>

  //val canceledJobs = new HashSet[Int]
  val agent: Agent[HashSet[Work]]

  def cancelReceive: Receive = {
    case Cancel(w) =>
      agent.send(_ += w)
    //log.info(s"[$t] El trabajo se cancelara (si llega...)")
  }

  def cancelled(w: Work): Boolean =
    if (agent.get.contains(w)) {
      agent.send(_ -= w)
      true
    } else {
      false
    }

}

abstract class Filter extends Actor with ActorLogging { this: CancellableFilter =>

  val random = new Random(System.currentTimeMillis())

  def response: Boolean

  val timeToWait: Int

  val timeToExecutor: Long

  def receive = cancelReceive orElse {
    case w:Work if !cancelled(w) =>
      //log.info(s"[$t] Llego trabajo")
      Thread.sleep(timeToWait)
      Nqueen.fact(timeToExecutor)
      val r = Response(response)
      //log.info(s"[$t] Respondio ${r.reponse}")
      sender ! r
  }
}

object Filter1 {
  def apply(agente: Agent[HashSet[Work]]) = new Filter with CancellableFilter {
    val timeToWait = 74
    val timeToExecutor = 42000000L
    val agent = agente
    def response = true //random.nextBoolean
  }
}

object Filter2 {
  def apply(agente: Agent[HashSet[Work]]) = new Filter with CancellableFilter {
    val timeToWait = 47
    val timeToExecutor = 21000000L
    val agent = agente
    def response = true //random.nextBoolean
  }
}

object Filter3 {
  def apply(agente: Agent[HashSet[Work]]) = new Filter with CancellableFilter {
    val timeToWait = 47
    val timeToExecutor = 21000000L
    val agent = agente
    def response = true //random.nextBoolean
  }
}

基本上,我认为 Worker 代码很丑,我想让它变得更好。你能帮忙改进吗?

我要改进的另一点是取消消息。由于我不知道哪些过滤器完成了,我需要全部取消,然后,至少一个取消是多余的(因为这项工作已经完成)

4

1 回答 1

1

这是次要的,但你为什么不将过滤器存储为序列?filters.foreach(f ! Cancel(w))

filter1 ! Cancel(w)
filter2 ! Cancel(w)
filter3 ! Cancel(w)

其他情况也一样:

class Worker(filter1: ActorRef, filter2: ActorRef, filter3: ActorRef) extends Actor with ActorLogging {
  private val filters = Seq(filter1, filter2, filter3)

  implicit val timeout = Timeout(30.seconds)

  def receive = {
    case w:Work =>

      val start = System.currentTimeMillis();
      val futures = filters.map { f =>
        (f ? w).mapTo[Response]
      }

      val aggResult = Future.find(futures) { res => !res.reponse }

      Await.result(aggResult, timeout.duration) match {
        case None =>
          Nqueen.fact(10500000L)
          log.info(s"[${w.message}] Procesado mensaje TRUE en ${System.currentTimeMillis() - start} ms");
          sender ! WorkResponse(w, true)

        case _ =>
          filters.foreach(f ! Cancel(w))
          log.info(s"[${w.message}] Procesado mensaje FALSE en ${System.currentTimeMillis() - start} ms");
          sender ! WorkResponse(w, false)
      }
  }

您也可以考虑编写构造函数,就Worker(filters: ActorRef*)好像您不强制执行三个过滤器一样。它认为发送一个冗余取消是可以的(我看到的替代方案过于复杂)。我不确定,但如果过滤器的创建速度非常快,是否可能会使用相同的种子值初始化随机数。

于 2013-09-12T15:20:43.507 回答