2

我正在尝试在 java 中编写线程安全的 Map[K, Set[V]] 实现。

  1. 如果将唯一键添加到地图中,则应创建(并添加到)新 Set
  2. 如果将非唯一键添加到映射中,则应将现有 Set 添加到。
  3. 如果从 Set 中删除一个值导致 Set 为空,则应从映射中删除该条目以避免内存泄漏。
  4. 我想解决这个问题而不需要同步整个事情

我在下面包含了一个失败的测试用例,如果您有解决方案,请告诉我。

package org.deleteme;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import junit.framework.Assert;

import org.junit.Test;

public class ConcurrentSetMapTest {
    public static class ConcurrentSetMap<K, V> {
        private final ConcurrentMap<K, Set<V>> map = new ConcurrentHashMap<K, Set<V>>();

        public void add(K key, V value) {
            Set<V> set = map.get(key);
            if (set != null) {
                set.add(value);
            } else {
                Set<V> candidateSet = createConcurrentSet(value);
                set = map.putIfAbsent(key, candidateSet);
                if (set != null) {
                    // candidate set not accepted, use existing
                    set.add(value);
                }
            }
        }

        public void remove(K key, V value) {
            Set<V> set = map.get(key);
            if (set != null) {
                boolean removed = set.remove(value);
                if (removed && set.isEmpty()) {
                    // this is not thread-safe and causes the test to fail
                    map.remove(key, set);
                }
            }
        }

        public boolean contains(K key, V value) {
            Set<V> set = map.get(key);
            if (set == null) {
                return false;
            }
            return set.contains(value);
        }

        protected Set<V> createConcurrentSet(V element) {
            Set<V> set = Collections.newSetFromMap(new ConcurrentHashMap<V, Boolean>());
            set.add(element);
            return set;
        }
    }

    @Test
    public void testThreadSafe() throws InterruptedException, ExecutionException {
        ConcurrentSetMap<String, String> setMap = new ConcurrentSetMap<String, String>();
        ExecutorService executors = Executors.newFixedThreadPool(4);
        List<Future<?>> futures = new ArrayList<Future<?>>();

        futures.add(executors.submit(new TestWorker(setMap, "key1")));
        futures.add(executors.submit(new TestWorker(setMap, "key1")));
        futures.add(executors.submit(new TestWorker(setMap, "key2")));
        futures.add(executors.submit(new TestWorker(setMap, "key2")));

        for (Future<?> future : futures) {
            future.get();
        }
    }

    public static class TestWorker implements Runnable {
        ConcurrentSetMap<String, String> setMap;
        String key;

        public TestWorker(ConcurrentSetMap<String, String> setMap, String key) {
            super();
            this.setMap = setMap;
            this.key = key;
        }

        public void run() {
            int sampleSize = 100000;
            for (int i = 0; i < sampleSize; ++ i) {
                // avoid value clashes with other threads
                String value = Thread.currentThread().getName() + i;

                Assert.assertFalse("Should not exist before add", setMap.contains(key, value));
                setMap.add(key, value);
                Assert.assertTrue("Should exist after add", setMap.contains(key, value));
                setMap.remove(key, value);
                Assert.assertFalse("Should not exist after remove", setMap.contains(key, value));
            }
        }
    }
}
4

5 回答 5

4

不要写这样的地图,用别人的。我会使用 Guava 的SetMultimap实现之一,例如HashMultimap并使用Multimaps.synchronizedSetMultimap对其进行同步。

于 2012-08-30T11:36:08.333 回答
2

这是一个完全并发且线程安全的实现:

public class ConcurrentSetMap<K,V> {

  private final ConcurrentMap<K, Set<V>> _map = new ConcurrentHashMap<K, Set<V>>();

  public void add(K key, V value) {
    Set<V> curSet = _map.get(key);
    while(true) {

      if((curSet != null) && curSet.contains(value)) {
        break;
      }

      Set<V> newSet = new HashSet<V>();
      newSet.add(value);

      if(curSet == null) {

        curSet = _map.putIfAbsent(key, newSet);
        if(curSet != null) {
          continue;
        }

      } else {

        newSet.addAll(curSet);
        if(!_map.replace(key, curSet, newSet)) {
          curSet = _map.get(key);
          continue;
        }
      }

      break;
    }
  }

  public void remove(K key, V value) {
    Set<V> curSet = _map.get(key);

    while(true) {
      if((curSet == null) || !curSet.contains(value))  {
        break;
      }

      if(curSet.size() == 1) {

        if(!_map.remove(key, curSet)) {
          curSet = _map.get(key);
          continue;
        }

      } else {

        Set<V> newSet = new HashSet<V>();
        newSet.addAll(curSet);
        newSet.remove(value);
        if(!_map.replace(key, curSet, newSet)) {
          curSet = _map.get(key);
          continue;
        }
      }

      break;
    }
  }

  public boolean contains(K key, V value) {
    Set<V> set = _map.get(key);
    return set != null && set.contains(value);
  }
}

将时间与@PeterLawrey 的答案(在我的盒子上)进行比较,他需要 2.9 秒,这需要 1.4 秒。

于 2012-08-30T15:40:21.730 回答
2

我设法解决了我的问题:)

我在最初的帖子中没有提到我需要从集合中快速读取,而且我不太关心写入速度。为此,我想出了一个同步写访问但不需要同步读访问的解决方案。下面的代码现在通过了我的测试用例。

感谢大家的建议。

public static class ConcurrentSetMap<K, V> {
    private final ConcurrentMap<K, Set<V>> map = new ConcurrentHashMap<K, Set<V>>();

    public synchronized void add(K key, V value) {
        Set<V> set = map.get(key);
        if (set != null) {
            set.add(value);
        } else {
            map.put(key, createConcurrentSet(value));
        }
    }

    public synchronized void remove(K key, V value) {
        Set<V> set = map.get(key);
        if (set != null) {
            set.remove(value);
            if (set.isEmpty()) {
                map.remove(key);
            }
        }
    }

    public boolean contains(K key, V value) {
        return get(key).contains(value);
    }

    public Set<V> get(K key) {
        Set<V> set = map.get(key);
        return set == null ? Collections.<V> emptySet() : set;
    }

    protected Set<V> createConcurrentSet(V value) {
        Set<V> set = Collections.newSetFromMap(new ConcurrentHashMap<V, Boolean>());
        set.add(value);
        return set;
    }
} 
于 2012-08-31T09:47:37.120 回答
1

您需要使用锁定,因为您正在执行多个需要共同原子的操作。

public class SynchronousMultiMap<K, V> {
    private final Map<K, Set<V>> map = new LinkedHashMap<K, Set<V>>();

    public synchronized void add(K key, V value) {
        Set<V> set = map.get(key);
        if (set == null)
            map.put(key, set = new LinkedHashSet<V>());
        set.add(value);
    }

    public synchronized void remove(K key, V value) {
        Set<V> set = map.get(key);
        if (set == null) return;
        set.remove(value);
        if (set.isEmpty()) map.remove(key);
    }

    public synchronized boolean contains(K key, V value) {
        Set<V> set = map.get(key);
        return set != null && set.contains(value);
    }

    @Test
    public void testThreadSafe() throws ExecutionException, InterruptedException {
        ExecutorService executors = Executors.newFixedThreadPool(3);
        List<Future<?>> futures = new ArrayList<Future<?>>();
        SynchronousMultiMap<String, Integer> setMap = new SynchronousMultiMap<String, Integer>();
        int sampleSize = 1000000;

        String[] keys = "key1,key2,key3,key4".split(",");
        for (int i = 0; i < 3; i++)
            futures.add(executors.submit(new TestWorker(setMap, keys, sampleSize, i)));

        executors.shutdown();
        for (Future<?> future : futures) {
            future.get();
        }
    }

    static class TestWorker implements Runnable {
        final SynchronousMultiMap<String, Integer> setMap;
        final String[] keys;
        final int sampleSize;
        final int value;

        public TestWorker(SynchronousMultiMap<String, Integer> setMap, String[] keys, int sampleSize, int value) {
            super();
            this.setMap = setMap;
            this.keys = keys;
            this.sampleSize = sampleSize;
            this.value = value;
        }

        public void run() {
            for (int i = 0; i < sampleSize; i += keys.length) {
                for (String key : keys) {
                    boolean contains = setMap.contains(key, value);
                    if (contains)
                        Assert.assertFalse("Should not exist before add", contains);
                    setMap.add(key, value);
                    boolean contains2 = setMap.contains(key, value);
                    if (!contains2)
                        Assert.assertTrue("Should exist after add", contains2);
                    setMap.remove(key, value);
                    boolean contains3 = setMap.contains(key, value);
                    if (contains3)
                        Assert.assertFalse("Should not exist after remove", contains3);
                }
            }
        }
    }
}

运行需要 0.35 秒。使用 asampleSize=1000000它需要 < 8 秒。

于 2012-08-30T11:34:25.607 回答
1

使用ConcurrentMapConcurrentSkipListSet

ConcurrentMap.putIfAbsent()如果键不存在,则调用以添加键集。

ConcurrentMap.remove( key, emptySet ),哪里emptySet是空的ConcurrentSkipListSet

如果与 对应的当前值为key空,则将其删除。

于 2014-11-03T22:21:02.227 回答