3

我正在尝试创建一个ConcurrentHashMap支持“快照”以提供一致的迭代器,并且想知道是否有更有效的方法来做到这一点。问题是,如果同时创建了两个迭代器,那么它们需要读取相同的值,而并发哈希映射的弱一致性迭代器的定义并不能保证会是这种情况。如果可能的话,我还想避免锁定:映射中有几千个值,处理每个项目需要几十毫秒,我不想在这段时间内阻止写入器,因为这可能会导致写入器阻塞一分钟或更长时间。

到目前为止我所拥有的:

  1. 键是字符串,其ConcurrentHashMap's值是ConcurrentSkipListMap<Long, T>
  2. 当一个元素添加到 hashmap 时putIfAbsent,会分配一个新的跳过列表,并通过添加对象skipList.put(System.nanoTime(), t)
  3. 为了查询地图,我使用map.get(key).lastEntry().getValue()返回最新的值。为了查询快照(例如使用迭代器),我使用map.get(key).lowerEntry(iteratorTimestamp).getValue(),其中iteratorTimestampSystem.nanoTime()迭代器初始化时调用的结果。
  4. 如果一个对象被删除,我使用map.get(key).put(timestamp, SnapShotMap.DELETED),其中 DELETED 是一个静态的最终对象。

问题:

  1. 是否有一个库已经实现了这个?或者除此之外,是否存在比ConcurrentHashMap和更合适的数据结构ConcurrentSkipListMap?我的键是可比较的,所以也许某种并发树会比并发哈希表更好地支持快照。
  2. 我如何防止这个东西不断增长?在 X 上或之前初始化的所有迭代器完成之后,我可以删除所有键小于 X 的跳过列表条目(映射中的最后一个键除外),但我不知道确定何时的好方法这已经发生了:我可以在其方法返回 false 时标记迭代器已完成hasNext,但并非所有迭代器都必须运行完成;我可以保留一个WeakReference迭代器,以便我可以检测它何时被垃圾收集,但我想不出一个好的方法来检测这个,除了使用一个遍历弱引用集合然后休眠几个的线程分钟 - 理想情况下线程会阻塞WeakReference并在包装的引用被 GC 时收到通知,但我认为这不是一个选项。

    ConcurrentSkipListMap<Long, WeakReference<Iterator>> iteratorMap;
    while(true) {
        long latestGC = 0;
        for(Map.Entry<Long, WeakReference<Iterator>> entry : iteratorMap.entrySet()) {
            if(entry.getValue().get() == null) {
                iteratorMap.remove(entry.getKey());
                latestGC = entry.getKey();
            } else break;
        }
        // remove ConcurrentHashMap entries with timestamps less than `latestGC`
        Thread.sleep(300000); // five minutes
    }
    

编辑:为了消除答案和评论中的一些混淆,我目前正在将弱一致性迭代器传递给公司另一个部门编写的代码,他们要求我提高迭代器一致性的强度。他们已经意识到我做 100% 一致的迭代器是不可行的,他们只是希望我尽最大努力。他们更关心吞吐量而不是迭代器的一致性,因此粗粒度锁不是一种选择。

4

3 回答 3

4

您需要特殊实现的实际用例是什么?来自ConcurrentHashMap的Javadoc (强调添加):

检索反映了最近完成的更新操作在其开始时保持的结果。...迭代器和枚举返回的元素反映了在迭代器/枚举创建时或之后的某个时刻哈希表的状态。它们不会抛出 ConcurrentModificationException。但是,迭代器被设计为一次只能由一个线程使用。

因此,常规ConcurrentHashMap.values().iterator()将为您提供一个“一致的”迭代器,但仅供单个线程一次性使用。如果您需要多次和/或通过多个线程使用相同的“快照”,我建议制作地图的副本。

编辑:有了新信息和对“高度一致”迭代器的坚持,我提供了这个解决方案。请注意,使用 ReadWriteLock 具有以下含义:

  • 写入将被序列化(一次只有一个写入器),因此写入性能可能会受到影响。
  • 只要没有正在进行的写入,就允许并发读取,因此读取性能的影响应该是最小的。
  • 活动的读者会阻止写者,但只要它需要检索对当前“快照”的引用。一旦线程拥有快照,它就不再阻塞写入者,无论处理快照中的信息需要多长时间。
  • 当任何写入处于活动状态时,读取器会被阻止;一旦写入完成,所有读者都可以访问新快照,直到新的写入替换它。

一致性是通过序列化写入并在每次写入时复制当前值来实现的。持有对“陈旧”快照的引用的读者可以继续使用旧快照而不必担心修改,并且垃圾收集器将在没有人再使用旧快照时回收旧快照。假设读者不需要从较早的时间点请求快照。

由于快照可能在多个并发线程之间共享,因此快照是只读的,无法修改。此限制也适用于从快照创建remove()的任何Iterator实例的方法。

import java.util.*;
import java.util.concurrent.locks.*;

public class StackOverflow16600019 <K, V> {
    private final ReadWriteLock locks = new ReentrantReadWriteLock();
    private final HashMap<K,V> map = new HashMap<>();
    private Collection<V> valueSnapshot = Collections.emptyList();

    public V put(K key, V value) {
        locks.writeLock().lock();
        try {
            V oldValue = map.put(key, value);
            updateSnapshot();
            return oldValue;
        } finally {
            locks.writeLock().unlock();
        }
    }

    public V remove(K key) {
        locks.writeLock().lock();
        try {
            V removed = map.remove(key);
            updateSnapshot();
            return removed;
        } finally {
            locks.writeLock().unlock();
        }
    }

    public Collection<V> values() {
        locks.readLock().lock();
        try {
            return valueSnapshot; // read-only!
        } finally {
            locks.readLock().unlock();
        }
    }

    /** Callers MUST hold the WRITE LOCK. */
    private void updateSnapshot() {
        valueSnapshot = Collections.unmodifiableCollection(
            new ArrayList<V>(map.values())); // copy
    }
}
于 2013-05-17T02:00:51.503 回答
3

我发现ctrie是理想的解决方案 - 它是一个具有恒定时间快照的并发哈希数组映射的 trie

于 2013-06-16T17:06:06.180 回答
0

解决方案1)仅在看跌期权和迭代上同步怎么样。这应该给你一个一致的快照。

解决方案2)开始迭代并创建一个布尔值,然后覆盖puts,putAll,以便它们进入队列,当迭代完成时,只需使用更改后的值制作这些puts。

于 2013-05-17T03:01:54.647 回答