0

我目前正在研究一种与 Scala 一起使用的日志记录机制,但我遇到了一个意外问题,阻止了我实际研究它。为了测试功能,我希望设置一个简单的消息传递环。在环内,每个节点都是 Scala Actor 的扩展,并且知道它的直接邻居(上一个/下一个)。

环构造如下完成,参数“nodes”从控制器参与者传递:

def buildRing(nodes:Array[Actor]){
  var spliceArr:Array[Actor] = new Array[Actor](2)
  spliceArr(0) = nodes(nodes.length-1)
  spliceArr(1) = nodes(1)
  nodes(0) !  spliceArr
  Thread.sleep(100)
  spliceArr(0) = nodes(nodes.length-2)
  spliceArr(1) = nodes(0)
  nodes(nodes.length-1) ! spliceArr
  Thread.sleep(100)
  for(i <-1 to numNodes-2){
      spliceArr(0) = nodes(i-1)
      spliceArr(1) = nodes(i+1)
      nodes(i) ! spliceArr
      Thread.sleep(100)
  }
}

这似乎可以按我的意愿运行,每个节点都接收到正确的邻居对。在节点类中有一个大小为 2 的数组,设置如下:

class node(locLogger:logger,nid:Int,boss:Actor) extends Actor{
  val ringNeighbors:Array[Actor] = new Array[Actor](2)
  def act{
    locLogger.start
    loop{
        receive{
            case n:Array[Actor] =>
              ringNeighbors(0) = n(0)
              ringNeighbors(1) = n(1)

一切都很好,但是,当我引入要在环周围传递的消息时(从节点 0 开始),我发现每个节点现在在它的 ringNeighbors 数组中都有相同的值。这些值与 ringBuilder(即节点 8 的邻居)函数中循环的最终迭代一致。没有发生额外的消息传递,所以我不明白这些值是如何为节点数组中的每个实例修改的,因此是环。

我仍在学习 Scala 的基本知识,希望我没有错误地忽略一些简单的事情。

4

1 回答 1

3

我认为问题如下:

Actor 模型本质上是一个异步模型,这意味着 Actor 有时会处理消息,而与发送时间无关。

您正在向每个参与者发送对大小为 2 数组的引用,该数组会根据迭代的状态不断更改其内容。但是,actor 不会在调用之后立即处理初始化消息nodes(i) ! spliceArr。所以可能发生的是迭代完成,并且只有在这之后,参与者才被安排来处理消息。spliceArr问题是,当 for 循环完成时,他们都看到了实例。

所以简单的解决方案是不发送数组而是发送一对:

nodes(i) ! spliceArr

变成

nodes(i) ! (nodes(i-1), nodes(i+1))

并且您还应该在循环之前修改相应的行。这种改变也应该在参与者的代码中执行——使用元组而不是数组来处理这种东西。

如果我的猜测是正确的,那么核心问题是您正在使用可变数据结构(在您的情况下为数组),这些数据结构在各种实体(在您的示例中为演员)之间共享。这总是会导致问题,因此除非您正在处理的应用程序确实特别需要有状态的数据结构,否则您应该始终将赌注押在不变性上。

现在,在参与者系统的特定情况下,参与者之间交换的消息更需要是不可变的。Actor 应该是封闭的数据结构,它们的状态不应该从外部访问。此外,在参与者系统中不应该有全局状态。

不幸的是,与其他实现 Actor 系统的语言(如 Erlang)不同,Scala 无法强制执行此行为。因此,确保发生这种情况是开发人员的工作。

可变消息是不好的,因为它们会导致参与者共享状态——消息中包含的状态在参与者并发执行的上下文中可能会导致难以发现的问题。

以下是上述修复后代码的外观:

def buildRing(nodes: Array[Actor]) {
    nodes.zipWithIndex.foreach {
        case (actor, index) => actor ! (previous(nodes, index), next(nodes, index))
    }
}

//Gets the next actor from the ring for the specified index.
def next(nodes: Array[Actor], index: Int): Actor = {
    val k = (index + 1) % nodes.length
    nodes(k)
}

//Gets the previous actor
def previous(nodes: Array[Actor], index: Int): Actor = {
    val k = if (index == 0) nodes.length - 1 else index - 1
    nodes(k)
}

class Node(locLogger:logger, nid:Int, boss:Actor) extends Actor {
    private var leftNeighbour: Option[Actor] = None //avoid using null in favor of Option
    private var rightNeighbour: Option[Actor] = None
    def act {
        locLogger.start
        loop {
            receive {
                case (left, right) => {
                    leftNeighbour = Some(left)
                    rightNeighbour = Some(right)
                }
            }
        }
    }
}

为了提高算法的可读性,我还做了一些更改,希望您不要介意。

于 2012-11-19T23:46:14.540 回答