我正在尝试在 java 中编写线程安全的 Map[K, Set[V]] 实现。
- 如果将唯一键添加到地图中,则应创建(并添加到)新 Set
- 如果将非唯一键添加到映射中,则应将现有 Set 添加到。
- 如果从 Set 中删除一个值导致 Set 为空,则应从映射中删除该条目以避免内存泄漏。
- 我想解决这个问题而不需要同步整个事情
我在下面包含了一个失败的测试用例,如果您有解决方案,请告诉我。
package org.deleteme;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import junit.framework.Assert;
import org.junit.Test;
public class ConcurrentSetMapTest {
public static class ConcurrentSetMap<K, V> {
private final ConcurrentMap<K, Set<V>> map = new ConcurrentHashMap<K, Set<V>>();
public void add(K key, V value) {
Set<V> set = map.get(key);
if (set != null) {
set.add(value);
} else {
Set<V> candidateSet = createConcurrentSet(value);
set = map.putIfAbsent(key, candidateSet);
if (set != null) {
// candidate set not accepted, use existing
set.add(value);
}
}
}
public void remove(K key, V value) {
Set<V> set = map.get(key);
if (set != null) {
boolean removed = set.remove(value);
if (removed && set.isEmpty()) {
// this is not thread-safe and causes the test to fail
map.remove(key, set);
}
}
}
public boolean contains(K key, V value) {
Set<V> set = map.get(key);
if (set == null) {
return false;
}
return set.contains(value);
}
protected Set<V> createConcurrentSet(V element) {
Set<V> set = Collections.newSetFromMap(new ConcurrentHashMap<V, Boolean>());
set.add(element);
return set;
}
}
@Test
public void testThreadSafe() throws InterruptedException, ExecutionException {
ConcurrentSetMap<String, String> setMap = new ConcurrentSetMap<String, String>();
ExecutorService executors = Executors.newFixedThreadPool(4);
List<Future<?>> futures = new ArrayList<Future<?>>();
futures.add(executors.submit(new TestWorker(setMap, "key1")));
futures.add(executors.submit(new TestWorker(setMap, "key1")));
futures.add(executors.submit(new TestWorker(setMap, "key2")));
futures.add(executors.submit(new TestWorker(setMap, "key2")));
for (Future<?> future : futures) {
future.get();
}
}
public static class TestWorker implements Runnable {
ConcurrentSetMap<String, String> setMap;
String key;
public TestWorker(ConcurrentSetMap<String, String> setMap, String key) {
super();
this.setMap = setMap;
this.key = key;
}
public void run() {
int sampleSize = 100000;
for (int i = 0; i < sampleSize; ++ i) {
// avoid value clashes with other threads
String value = Thread.currentThread().getName() + i;
Assert.assertFalse("Should not exist before add", setMap.contains(key, value));
setMap.add(key, value);
Assert.assertTrue("Should exist after add", setMap.contains(key, value));
setMap.remove(key, value);
Assert.assertFalse("Should not exist after remove", setMap.contains(key, value));
}
}
}
}