4

我有 Scala 的代码

class MyClass {
  private val myData: Map[String, MyClass2] = new HashMap[String, MyClass2]()

  def someMethod = {
      synchronized(myData) {
        val id = getSomeId
        if (myData.containsKey(id)) myData = myData.remove(id)
        else Log.d("123", "Not found!")
      }
  }

  def getSomeId = //....
}

我想知道,是否可以在不使用synchronized或不涉及其他库(例如Java 或 Scala 中内置的Akka任何其他库(类))的情况下保持此代码的线程安全?

理想情况下,我只想通过使用不变性的概念来使线程安全(final如果你愿意的话,在 Java 中)。

更新

class MyClass(myData: Map[String, MyClass2] = new HashMap[String, MyClass2]()) {

  def someMethod = {
      synchronized(myData) {
        val id = getSomeId
        if (myData.containsKey(id)) new MyClass(myData.remove(id))
        else {
           Log.d("123", "Not found!")
           this
         }
      }
  }

  def getSomeId = //....
}
4

4 回答 4

4

MyClass只有当你也使不可变(并且让它也只使用不可变的数据结构)时,你才能解决不可变的问题。原因很简单:如果MyClass是可变的,那么你必须通过并发线程同步修改。

这需要不同的设计——导致实例MyClass“更改”的每个操作都将返回一个(可能)修改后的实例。

import collection.immutable._

class MyClass2 {
  // ...
}

// We can make the default constructor private, if we want to manage the
// map ourselves instead of allowing users to pass arbitrary maps
// (depends on your use case):
class MyClass private (val myData: Map[String,MyClass2]) {
  // A public constructor:
  def this() = this(new HashMap[String,MyClass2]())

  def someMethod(id: String): MyClass = {
    if (myData.contains(id))
      new MyClass(myData - id) // create a new, updated instance
    else {
      println("Not found: " + id)
      this // no modification, we can return the current
            // unmodified instance
    }
  }

  // other methods for adding something to the map
  // ...
}
于 2013-06-15T14:36:48.877 回答
2

如果您使用 scala 2.10 中的TrieMap,这是一个无锁并发映射实现,您可以避免同步:

import scala.collection.concurrent.TrieMap

class MyClass2

class MyClass {
    private val myData = TrieMap[String, MyClass2]()
    def someMethod = myData -= getSomeId
    def getSomeId = "id"
}
于 2013-06-15T13:30:53.380 回答
1

我建议使用库,因为自己获得正确的并发性很难。例如,您可以使用像TrieMap这样的并发地图。请参阅上面的答案。

但是让我们假设您出于教育目的想要手动执行此操作。使上述线程安全的第一步是使用不可变集合。所以而不是

private val myData: Map[String, MyClass2] = new HashMap[String, MyClass2]()

你会用

private var myData = Map.empty[String, MyClass2] 

(即使这里有一个 var,它的可变状态也比上面的版本少。在这种情况下,唯一可变的东西是单个引用,而在上面的示例中,整个集合都是可变的)

现在你必须处理 var。您必须确保在所有其他线程上“看到”一个线程上的 var 更新。所以你必须将该字段标记为@volatile。如果您有一个仅从一个线程完成写入的发布/订阅方案,那就足够了。但是假设您想从不同的线程读取和写入,您将需要对所有写入访问使用同步。

显然,这足以保证引入一个小助手类:

final class SyncronizedRef[T](initial:T) {
  @volatile private var current = initial

  def get:T = current

  def update(f:T=>T) {
    this synchronized {
      current = f(current)
    }
  }
}

有了这个小助手,上面的代码可以这样实现:

class MyClass {
  val state = new SyncronizedRef(Map.empty[String, MyClass2])

  def someMethod = {
    state.update(myData =>
      val id = getSomeId
      if (myData.containsKey(id)) 
        myData - id
      else { 
        Log.d("123", "Not found!")
        myData
      }
  }

  def getSomeId = //....
}

就地图而言,这将是线程安全的。然而,整个事情是否是线程安全的取决于 getSomeID 中发生的任何事情。

一般来说,只要传递给 update 的东西是一个纯粹的函数,它只是转换数据而没有任何副作用,这种处理并发的方式就可以工作。如果您的状态比单个地图更复杂,那么以纯粹的函数式样式编写更新可能会非常具有挑战性。

SynchronizedRef 中仍然存在低级多线程原语,但您的程序逻辑完全没有它们。您只需通过组合纯函数来描述程序的状态如何响应外部输入而变化。

无论如何,这个特定示例的最佳解决方案就是使用现有的并发映射实现。

于 2013-06-15T13:46:01.173 回答
1

每当您共享可变状态时,您都需要一种并发机制。正如 Rüdiger 指出的那样,最好的办法是确定您拥有哪种并发场景,然后使用最能解决该场景的现有工具:

  • 普通的旧 Java 同步锁
  • 原子比较和交换
  • 软件事务内存(例如 Scala-STM)
  • 消息传递/演员

或者,当然,如果您不需要尽可能高的性能,您可以使用协作多任务(在单个共享线程上运行并发进程)。

如果您的班级状态是可监督的,您可以使该班级完全不可变,例如:

case class MyClass2()

case class MyClass(myData: Map[String, MyClass2] = Map.empty) {
  def someMethod = {
    val id = getSomeId
    if (myData.contains(id)) copy(myData = myData - id)
    else throw new IllegalArgumentException(s"Key $id not found")
  }

  def getSomeId = "foo" // ...
}

一个实例MyClass只能通过将数据复制到一个新实例来改变,因此多个线程可以安全地引用同一个实例。但另一方面,如果两个线程 A 和 B 从同一个实例开始foo1,并且其中任何一个对其进行变异并希望另一个线程看到该变异,那么您需要以某种形式再次共享该变异状态(使用 STM 参考单元,通过参与者发送消息,将其存储在同步变量中等)

val foo1 = MyClass(Map("foo" -> MyClass2()))
val foo2 = foo1.someMethod  // foo1 is untouched
于 2013-06-16T09:31:23.117 回答