4

我是一位经验丰富的 Java 程序员,我开始开发基于 actor 的 Scala 应用程序。在我目前正在开发的应用程序中,我必须处理表现出自主和反应行为的 Sender Actor 的实现。场景如下(伪代码):

Actor Sender{

        Active behavior (must be active once the actor boots):
            do-in-sequence{
                send to Stdout A
                send to Stdout B
                send to Stdout C
                send stop to Stdout and then exit
            }

        Reactive behavior (must be active once the actor boots):
            as soon as received stop from StopNotifier -> send stop to Stdout and then exit
    }
}

Actor Stdout{
    Purely reactive behavior (i.e. wait for msg){
        as soon as received A -> print A
        as soon as received B -> print B
        as soon as received C -> print C
        as soon as received stop from Sender -> exit
    }
}
Actor StopNotifier
    Purely active behavior {
        compute, and when some condition is met -> send stop to Sender
    }

我的问题是:对于需要集成自主性和反应性(如本文所述)的 Scala 演员来说,表达自主行为的最佳方式是什么?

换句话说,在上面的示例中,对 Sender 演员进行编码的最佳方式/风格是什么?

我想出了一个解决方案(在下面报告),但由于我不是 scala 大师(还 :))我想知道我所实施的是否可以在更好/更好的解决方案中得到改进。

case object START
case object A
case object B
case object C
case object SENT_A
case object SENT_B
case object ACK_A
case object ACK_B
case object ACK_C
case object STOP

class Sender(stdout: Stdout) extends Actor {
    def act() {
        self!START
        while (true){
            receive {
                case START =>
                    stdout!?A
                    self!SENT_A
                case SENT_A =>
                    stdout!?B
                    self!SENT_B
                case SENT_B =>
                    stdout!?C
                    stdout!?STOP
                    exit()
                case STOP => {
                    Console.println("[Sender:]Received STOP, terminating")
                    stdout!?STOP
                    exit()
                }
            }
        }
    }
}

class Stdout() extends Actor {
    def act() { 
        while (true) {
            receive{
                case A =>
                    Console.println("[Stdout:]A")
                    reply(ACK_A)
                case B =>
                    Console.println("[Stdout:]B")
                    reply(ACK_B)
              case C =>
                    Console.println("[Stdout:]C")
                    reply(ACK_C)
                    exit()
              case STOP =>
                    Console.println("[Stdout:]Received STOP, terminating")
                    exit()
            }
        }
    }
}

class StopNotifier(sender: Sender) extends Actor {
    def act() {
        /*
         * The idea is that the StopNotifier should send a STOP message to the Sender
         * when a certain condition is met.
         * The sleep used here is just a semplification, since the detection of such
         * a condition is not relevant for the example.
         */

        Thread.sleep(200)
        Console.println("[StopNotifier:]Sending STOP to sender")
        sender ! STOP
        exit()
   }
}

object app extends Application {
    val stdout = new Stdout
    stdout.start
    val sender = new Sender(stdout)
    sender.start
    val stopNotifier = new StopNotifier(sender)
    stopNotifier.start
}

特别是在我当前的实现中困扰我的是,为了能够对从 StopNotifier 接收到的 STOP 消息迅速做出反应,我需要在 Sender 的每个执行步骤(即在将 A、B 发送到 Stdout 演员)。在我看来,做事情的正确方法太棘手了:)。

我还尝试使用其他 Scala 语言结构(例如异步发送、反应等)开发其他解决方案,但在我看来,它们似乎受到其他问题/技巧的影响。

有没有人有更好的解决方案来处理 scala 演员中自治和反应行为的整合?

4

2 回答 2

4

如果我理解正确,您应该改用 Akka 演员,特别是 Akka FSM,将发送者建模为状态机。Akka actor 有一个内置的停止机制,或者您始终可以使用自己的消息,这些消息可以通过处理程序从所有状态进行whenUnhandled处理。

http://doc.akka.io/docs/akka/snapshot/scala/fsm.html

这显然是矫枉过正,但我​​假设你正在尝试做一些更复杂的事情。您还可以使用Stdout“watch” Sender,以便它在终止时Sender终止,而不是在收到特定消息时终止。请参阅生命周期监控(又名 DeathWatch)

package fsmTest

import akka.actor._
import akka.util.duration._

sealed trait Msg
case object A extends Msg
case object B extends Msg
case object C extends Msg

sealed trait SenderState
case object Started extends SenderState
case object SentA extends SenderState
case object SentB extends SenderState

case class SenderData()

class Sender(stdout: ActorRef) 
  extends Actor 
  with FSM[SenderState, SenderData] {

  case object GoNextState

  startWith(Started, SenderData())

  when(Started) {
    case Event(GoNextState, data) => {
      stdout ! A
      goto(SentA) using data
    }
  }

  when(SentA) {
    case Event(GoNextState, data) => {
      stdout ! B
      goto(SentB) using data
    }
  }

  when(SentB) {
    case Event(GoNextState, data) => {
      stdout ! C
      goto(Started) using data
    }
  }

//      //Handle messages which aren't explicitly handled in state here
//      whenUnhandled {
//        case Event(SomeCustomStop, data) => {
//          stop(FSM.Shutdown)
//        }
//      }

  setTimer("goNextState", GoNextState, 1 second, repeat = true)

  initialize
}

class Stdout() extends Actor {
  def receive = {
    case msg: Msg => {
      context.watch(sender) //Not sure if you're gonna want to do this here, but you get the point
      println(msg)
    }
    case Terminated(_) => context.stop(self)
  }
}

object FSMTest extends App {

  implicit val system = ActorSystem("Testing")
  val stdout = system.actorOf(Props[Stdout], "stdout")
  val sender = system.actorOf(Props(new Sender(stdout)), "sender")

  system.scheduler.scheduleOnce(5 seconds) {
    system.stop(sender)
    system.shutdown()
  }
  system.awaitTermination(10 seconds)
}

无论您如何在发送者中实现状态,如果您想使用 Actors 对其进行建模,我相信您将需要“自我发送”消息,无论是在事件处理中还是使用上面的计时器。

于 2012-07-10T17:41:59.783 回答
1

从一个反应或接收块中的一个参与者按顺序发送的消息将按该顺序接收。(你可能穿插了来自其他参与者的其他消息,但你不会先发送 A 然后 B 并获得 B 然后 A。)

所以你可以

stdout ! A
stdout ! B
stdout ! C

除非您还需要做其他事情。

于 2012-07-10T18:52:35.577 回答