18

我有一个用例,我必须

  • 如果 ConcurrentHashMap 中不存在该键,则插入一个新值
  • 如果键已经存在于 ConcurrentHashMap 中,则用新值替换旧值,其中新值是从旧值派生的(不是昂贵的操作)

我提供以下代码:

public void insertOrReplace(String key, String value) {
        boolean updated = false;
        do {
            String oldValue = concurrentMap.get(key);
            if (oldValue == null) {
                oldValue = concurrentMap.putIfAbsent(key, value);
                if (oldValue == null) {
                    updated = true;
                }
            }
            if (oldValue != null) {
                final String newValue = recalculateNewValue(oldValue, value);
                updated = concurrentMap.replace(key, oldValue, newValue);
            }
        } while (!updated);
    }

你认为它是正确的和线程安全的吗?

有没有更简单的方法?

4

4 回答 4

9

您可以使用下面与您的代码等效的代码将其缩短一点。我已经对它进行了一些压力测试,数千个线程同时访问它:它按预期工作,执行了多次重试(循环)(显然,你永远无法通过并发世界中的测试来证明正确性)。

public void insertOrReplace(String key, String value) {
    for (;;) {
        String oldValue = concurrentMap.putIfAbsent(key, value);
        if (oldValue == null)
            return;

        final String newValue = recalculateNewValue(oldValue, value);
        if (concurrentMap.replace(key, oldValue, newValue))
            return;
    }
}
于 2012-04-24T21:56:49.703 回答
2

您的方法似乎是线程安全的。如果您不需要 ConcurrentHashMap 的性能优势,请考虑改用常规 HashMap 并同步对它的所有访问。你的方法类似于 AtomicInteger.getAndSet(int),所以应该没问题。我怀疑是否有更简单的方法可以做到这一点,除非您正在寻找图书馆调用来为您完成工作。

于 2012-04-24T20:19:48.317 回答
2

我不认为这是正确的。据我了解,merge() 方法将是这项工作的正确工具。我目前有同样的问题,并写了一个小测试来查看结果。

该测试启动了 100 名工人。他们每个人都将地图中的值增加 100 次。所以预期的结果是 10000。

有两种类型的工人。一种使用替换算法并使用合并的算法。该测试使用不同的实现运行两次。

import java.util.concurrent.ArrayBlockingQueue;                                                                     
import java.util.concurrent.ConcurrentHashMap;                                                                      
import java.util.concurrent.ConcurrentMap;                                                                          
import java.util.concurrent.ExecutorService;                                                                        
import java.util.concurrent.ThreadPoolExecutor;                                                                     
import java.util.concurrent.TimeUnit;                                                                               

public class ConcurrentMapTest                                                                                      
{                                                                                                                   

   private static ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();                                   

   private final class ReplaceWorker implements Runnable                                                            
   {                                                                                                                
      public void run()                                                                                             
      {                                                                                                             
         for(int i = 0; i<100; i++)                                                                                 
         {                                                                                                          
            Integer putIfAbsent = map.putIfAbsent("key", Integer.valueOf(1));                                       
            if(putIfAbsent == null)                                                                                 
               return;                                                                                              
            map.replace("key", putIfAbsent + 1);                                                                    
         }                                                                                                          
      }                                                                                                             
   }                                                                                                                

   private final class MergeWorker implements Runnable                                                              
   {                                                                                                                
      public void run()                                                                                             
      {                                                                                                             
         for(int i = 0; i<100; i++)                                                                                 
         {                                                                                                          
            map.merge("key", Integer.valueOf(1), (ov, nv) -> {                                                      
               return ov + 1;                                                                                       
            });                                                                                                     
         }                                                                                                          
      }                                                                                                             
   }                                                                                                                

   public MergeWorker newMergeWorker()                                                                              
   {                                                                                                                
      return new MergeWorker();                                                                                     
   }                                                                                                                

   public ReplaceWorker newReplaceWorker()                                                                          
   {                                                                                                                
      return new ReplaceWorker();                                                                                   
   }                                                                                                                

   public static void main(String[] args)                                                                           
   {                                                                                                                
      map.put("key", 1);                                                                                            
      ConcurrentMapTest test = new ConcurrentMapTest();                                                             
      ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQu
      for(int i = 0; i<100; i++)                                                                                    
      {                                                                                                             
         threadPool.submit(test.newMergeWorker());                                                                  
      }                                                                                                             
      awaitTermination(threadPool);                                                                                 
      System.out.println(test.map.get("key"));                                                                      

      map.put("key", 1);                                                                                            
      threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000));      
      for(int i = 0; i<100; i++)                                                                                    
      {                                                                                                             
         threadPool.submit(test.newReplaceWorker());                                                                
      }                                                                                                             
      awaitTermination(threadPool);                                                                                 
      System.out.println(test.map.get("key"));                                                                      
   }                                                                                                                

   private static void awaitTermination(ExecutorService threadPool)                                                 
   {                                                                                                                
      try                                                                                                           
      {                                                                                                             
         threadPool.shutdown();                                                                                     
         boolean awaitTermination = threadPool.awaitTermination(1, TimeUnit.SECONDS);                               
         System.out.println("terminted successfull: " + awaitTermination);                                          
      }                                                                                                             
      catch (InterruptedException e)                                                                                
      {                                                                                                             
         // TODO Auto-generated catch block                                                                         
         e.printStackTrace();                                                                                       
      }                                                                                                             
   }                                                                                                                
}                                                                                          
结果:
终止成功:真
10000
终止成功:真
1743

问题是在您的情况下,get 和 put 之间存在差距,因此对地图结果的并发访问会被覆盖。使用合并它是一个原子操作,尽管文档没有说明它。

于 2014-06-30T08:57:56.587 回答
2

您可以MutableMapIterable.updateValueWith(K key, Function0<? extends V> factory, Function2<? super V,? super P,? extends V> function, P parameter)Eclipse Collections使用。

如果地图中没有任何值,则该factory参数会创建一个初始值。该function参数与附加参数一起应用于地图值,以得出新的地图值。这parameter作为最终参数传递给updateValueWith(). 即使键不在地图中,也会调用该函数。所以初始值实际上是function应用于 和 的factory输出parameterfunction不能改变值;它应该返回一个新值。在您的示例中,地图值是不可变的字符串,所以我们很好。

在 ConcurrentMaps likeorg.eclipse.collections.impl.map.mutable.ConcurrentHashMap中, 的实现updateValueWith()也是线程安全和原子的。重要的function是不要改变映射值,否则它不是线程安全的。它应该返回新值。在您的示例中,地图值是不可变的字符串,所以我们很好。

如果您的方法recalculateNewValue()只是进行字符串连接,那么您可以使用updateValueWith().

Function0<String> factory = () -> "initial ";
Function2<String, String, String> recalculateNewValue = String::concat;

MutableMap<String, String> map = new ConcurrentHashMap<>();
map.updateValueWith("test", factory, recalculateNewValue, "append1 ");
Assert.assertEquals("initial append1 ", map.get("test"));
map.updateValueWith("test", factory, recalculateNewValue, "append2");
Assert.assertEquals("initial append1 append2", map.get("test"));

您可以使用 Java 8 的ConcurrentMap.compute(K key, BiFunction remappingFunction)来完成同样的事情,但它有一些缺点。

ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append1 ");
Assert.assertEquals("initial append1 ", map.get("test"));
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append2");
Assert.assertEquals("initial append1 append2", map.get("test"));
  • 没有单独的工厂来处理缺少键的情况,因此 lambda 的主体必须处理值和初始值。
  • API 不适合重用 lambda。每次调用都updateValueWith()共享相同的 lambda,但每次调用compute()都会在堆上创建新的垃圾。

注意:我是 Eclipse Collections 的提交者

于 2014-07-10T19:14:29.870 回答