0

我的课程从 ConcurrentHashmap[String,immutable.List[String]] 扩展

它有两种方法:

  def addEntry(key: String, newList: immutable.List[String]) = {
    ...
    //if key exist,appending the newList to the exist one 
    //otherwise set the newList as the value
  }

  def resetEntry(key: String): Unit = {
    this.remove(key)
  }

为了使 addEntry 方法线程安全,我尝试了:

this.get(key).synchronized{
  //append or set here
}

但是如果 key 不存在会引发空指针异常,并且在同步之前使用 putIfAbsent(key, new immutable.List()) 将不起作用,因为在 putIfAbsent 和之前进入同步块之后,key 可能会被 resetEntry 删除。

使 addEntry 和 resetEntry 都同步方法可以工作,但锁太大

那么,我能做些什么呢?

ps.这篇文章与如何在 ConcurrentHashMap 线程安全中更新 BigDecimal类似,同时请帮助我弄清楚如何编写除一般指南之外的代码

--update-- checkout https://stackoverflow.com/a/34309186/404145,在将近 3 年后解决了这个问题。

4

3 回答 3

1

除了删除条目,您可以简单地清除它吗?您仍然可以使用同步列表并确保原子性。

  def resetEntry(key: String, currentBatchSize: Int): Unit = {
    this.get(key).clear();
  }

这适用于每个键都有一个条目的假设。例如,如果this.get(key)==null您想插入一个新的sychronizedList也应该作为清除的。

于 2012-07-16T16:11:06.760 回答
1

3年多后,我想现在我可以回答我的问题了。

原来的问题是:

我得到一个ConcurrentHashMap[String, List],许多线程正在向它附加值,我怎样才能使它成为线程安全的?

使addEntry()同步将起作用,对吗?

synchronize(map.get(key)){
  map.append(key, value)
}

在大多数情况下是的,除非map.get(key)为 null,这将导致 NullPointerException。

那么map.putIfAbsent(key, new List)像这样添加呢:

map.putIfAbsent(key, new List)
synchronize(map.get(key)){
  map.append(key, value)
}

现在好多了,但是如果在putIfAbsent()另一个线程调用之后resetEntry(),我们将再次看到 NullPointerException。

使 addEntry 和 resetEntry 两个同步方法都可以工作,但是锁太大了。

那么追加时的MapEntry Level Lock和重置时的Map Level Lock呢?

ReentrantReadWriteLock 来了:在调用时addEntry(),我们获取映射的共享锁,这使得追加尽可能并发,在调用时resetEntry(),我们获取排他锁,以确保没有其他线程同时更改映射。

代码如下所示:

class MyMap extends ConcurrentHashMap{
  val lock = new ReentrantReadWriteLock();  

  def addEntry(key: String, newList: immutable.List[String]) = {
    lock.readLock.lock()

    //if key exist,appending the newList to the exist one 
    //otherwise set the newList as the value
    this.putIfAbsent(key, new List())
    this(key).synchronized{
        this(key) += newList
    }

    lock.readLock.unlock()
  }

  def resetEntry(key: String, currentBatchSize: Int): Unit = {
    lock.writeLock.lock()

    this.remove(key)

    lock.writeLock.unlock()
  }
}
于 2015-12-16T10:12:34.530 回答
0

您可以尝试受 CAS(比较和交换)过程启发的方法:

(在伪 java-scala 代码中,因为我的 Scala 仍处于起步阶段)

def addEntry(key: String, newList: immutable.List[String]) = {
    val existing = putIfAbsent(key, newList); 
    if (existing != null) {
       synchronized(existing) {
           if (get(key) == existing) { // ask again for the value within the synchronized block to ensure consistence. This is the compare part of CAS
                return put(key,existing ++ newList); // Swap the old value by the new
           } else {
               throw new ConcurrentModificationException(); // how else mark failure?
           }
       }
    }
    return existing;
}
于 2012-07-16T17:13:56.673 回答