41

我正在为多线程环境中的键聚合多个值。密钥是事先不知道的。我以为我会做这样的事情:

class Aggregator {
    protected ConcurrentHashMap<String, List<String>> entries =
                            new ConcurrentHashMap<String, List<String>>();
    public Aggregator() {}

    public void record(String key, String value) {
        List<String> newList =
                    Collections.synchronizedList(new ArrayList<String>());
        List<String> existingList = entries.putIfAbsent(key, newList);
        List<String> values = existingList == null ? newList : existingList;
        values.add(value);
    }
}

我看到的问题是,每次运行此方法时,我都需要创建一个新的 an 实例ArrayList,然后我将其丢弃(在大多数情况下)。这似乎是对垃圾收集器的无理滥用。有没有更好的、线程安全的方法来初始化这种结构而不必使用synchronizerecord方法?我对让该方法不返回新创建的元素的决定感到有些惊讶putIfAbsent,并且除非需要(可以这么说),否则缺乏一种推迟实例化的方法。

4

7 回答 7

47

Java 8 引入了一个 API 来解决这个确切的问题,提供了一个单行解决方案:

public void record(String key, String value) {
    entries.computeIfAbsent(key, k -> Collections.synchronizedList(new ArrayList<String>())).add(value);
}

对于 Java 7:

public void record(String key, String value) {
    List<String> values = entries.get(key);
    if (values == null) {
        entries.putIfAbsent(key, Collections.synchronizedList(new ArrayList<String>()));
        // At this point, there will definitely be a list for the key.
        // We don't know or care which thread's new object is in there, so:
        values = entries.get(key);
    }
    values.add(value);
}

这是填充ConcurrentHashMap.

特殊方法putIfAbsent(K, V))会将您的值对象放入,或者如果另一个线程在您之前获得,那么它将忽略您的值对象。无论哪种方式,在调用之后putIfAbsent(K, V))get(key)线程之间都保证是一致的,因此上面的代码是线程安全的。

唯一浪费的开销是,如果某个其他线程同时为同一个键添加了一个新条目:您最终可能会丢弃新创建的值,但只有在没有条目并且您的线程丢失,这通常很少见。

于 2012-05-24T19:00:53.140 回答
15

从 Java-8 开始,您可以使用以下模式创建 Multi Maps:

public void record(String key, String value) { entries.computeIfAbsent(key, k -> Collections.synchronizedList(new ArrayList<String>())) .add(value); }

ConcurrentHashMap 文档(不是通用合同)指定 ArrayList 只会为每个键创建一次,在为新键创建 ArrayList 时延迟更新的初始成本很小:

http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#computeIfAbsent-K-java.util.function.Function-

于 2014-09-01T05:31:56.067 回答
11

最后,我对@Bohemian 的回答进行了轻微修改。他提出的解决方案用调用覆盖了values变量putIfAbsent,这产生了我之前遇到的同样问题。似乎工作的代码如下所示:

    public void record(String key, String value) {
        List<String> values = entries.get(key);
        if (values == null) {
            values = Collections.synchronizedList(new ArrayList<String>());
            List<String> values2 = entries.putIfAbsent(key, values);
            if (values2 != null)
                values = values2;
        }
        values.add(value);
    }

它不像我想要的那样优雅,但它比ArrayList在每次调用时创建一个新实例的原始版本要好。

于 2012-05-24T21:28:08.460 回答
3

根据 Gene 的回答创建了两个版本

public  static <K,V> void putIfAbsetMultiValue(ConcurrentHashMap<K,List<V>> entries, K key, V value) {
    List<V> values = entries.get(key);
    if (values == null) {
        values = Collections.synchronizedList(new ArrayList<V>());
        List<V> values2 = entries.putIfAbsent(key, values);
        if (values2 != null)
            values = values2;
    }
    values.add(value);
}

public  static <K,V> void putIfAbsetMultiValueSet(ConcurrentMap<K,Set<V>> entries, K key, V value) {
    Set<V> values = entries.get(key);
    if (values == null) {
        values = Collections.synchronizedSet(new HashSet<V>());
        Set<V> values2 = entries.putIfAbsent(key, values);
        if (values2 != null)
            values = values2;
    }
    values.add(value);
}

它运作良好

于 2013-07-05T18:21:09.980 回答
3

这是一个我也在寻找答案的问题。该方法putIfAbsent实际上并没有解决额外的对象创建问题,它只是确保这些对象中的一个不会替换另一个。但是线程之间的竞争条件会导致多个对象实例化。我可以为这个问题找到 3 个解决方案(并且我会遵循这个优先顺序):

1-如果您使用的是 Java 8,实现这一目标的最佳方法可能computeIfAbsentConcurrentMap. 您只需要给它一个将同步执行的计算函数(至少对于ConcurrentHashMap实现而言)。例子:

private final ConcurrentMap<String, List<String>> entries =
        new ConcurrentHashMap<String, List<String>>();

public void method1(String key, String value) {
    entries.computeIfAbsent(key, s -> new ArrayList<String>())
            .add(value);
}

这是来自的javadoc ConcurrentHashMap.computeIfAbsent

如果指定的键尚未与值关联,则尝试使用给定的映射函数计算其值并将其输入到此映射中,除非为空。整个方法调用以原子方式执行,因此每个键最多应用一次该函数。在计算过程中,其他线程对该映射的某些尝试更新操作可能会被阻塞,因此计算应该简短而简单,并且不得尝试更新该映射的任何其他映射。

2- 如果你不能使用 Java 8,你可以使用Guava's LoadingCache,这是线程安全的。你给它定义了一个加载函数(就像compute上面的函数一样),你可以确定它会被同步调用。例子:

private final LoadingCache<String, List<String>> entries = CacheBuilder.newBuilder()
        .build(new CacheLoader<String, List<String>>() {
            @Override
            public List<String> load(String s) throws Exception {
                return new ArrayList<String>();
            }
        });

public void method2(String key, String value) {
    entries.getUnchecked(key).add(value);
}

3-如果您也不能使用 Guava,您可以随时手动同步并进行双重检查锁定。例子:

private final ConcurrentMap<String, List<String>> entries =
        new ConcurrentHashMap<String, List<String>>();

public void method3(String key, String value) {
    List<String> existing = entries.get(key);
    if (existing != null) {
        existing.add(value);
    } else {
        synchronized (entries) {
            List<String> existingSynchronized = entries.get(key);
            if (existingSynchronized != null) {
                existingSynchronized.add(value);
            } else {
                List<String> newList = new ArrayList<>();
                newList.add(value);
                entries.put(key, newList);
            }
        }
    }
}

我对所有这 3 个方法以及非同步方法做了一个示例实现,这会导致额外的对象创建: http: //pastebin.com/qZ4DUjTr

于 2016-07-22T14:58:58.423 回答
1

Java 1.7.40 处理空数组列表创建问题的内存浪费(还有 GC 等)。不要担心创建空数组列表。参考:http: //javarevisited.blogspot.com.tr/2014/07/java-optimization-empty-arraylist-and-Hashmap-cost-less-memory-jdk-17040-update.html

于 2014-09-01T05:42:09.727 回答
1

具有putIfAbsent最快执行时间的方法,在竞争激烈的环境中比“lambda”方法快 2 到 50 倍。Lambda 不是这种“powerloss”背后的原因,问题是在 Java-9 优化之前computeIfAbsent 内部的强制同步。

基准:

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ConcurrentHashMapTest {
    private final static int numberOfRuns = 1000000;
    private final static int numberOfThreads = Runtime.getRuntime().availableProcessors();
    private final static int keysSize = 10;
    private final static String[] strings = new String[keysSize];
    static {
        for (int n = 0; n < keysSize; n++) {
            strings[n] = "" + (char) ('A' + n);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int n = 0; n < 20; n++) {
            testPutIfAbsent();
            testComputeIfAbsentLamda();
        }
    }

    private static void testPutIfAbsent() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
        final Random random = new Random();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        String s = strings[random.nextInt(strings.length)];
                        start = System.nanoTime();

                        AtomicInteger count = map.get(s);
                        if (count == null) {
                            count = new AtomicInteger(0);
                            AtomicInteger prevCount = map.putIfAbsent(s, count);
                            if (prevCount != null) {
                                count = prevCount;
                            }
                        }
                        count.incrementAndGet();
                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

    private static void testComputeIfAbsentLamda() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
        final Random random = new Random();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        String s = strings[random.nextInt(strings.length)];
                        start = System.nanoTime();

                        AtomicInteger count = map.computeIfAbsent(s, (k) -> new AtomicInteger(0));
                        count.incrementAndGet();

                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

}

结果:

Test testPutIfAbsent average time per run: 115.756501 ns
Test testComputeIfAbsentLamda average time per run: 276.9667055 ns
Test testPutIfAbsent average time per run: 134.2332435 ns
Test testComputeIfAbsentLamda average time per run: 223.222063625 ns
Test testPutIfAbsent average time per run: 119.968893625 ns
Test testComputeIfAbsentLamda average time per run: 216.707419875 ns
Test testPutIfAbsent average time per run: 116.173902375 ns
Test testComputeIfAbsentLamda average time per run: 215.632467375 ns
Test testPutIfAbsent average time per run: 112.21422775 ns
Test testComputeIfAbsentLamda average time per run: 210.29563725 ns
Test testPutIfAbsent average time per run: 120.50643475 ns
Test testComputeIfAbsentLamda average time per run: 200.79536475 ns
于 2017-07-07T11:05:28.843 回答