1

我有一个像Map<Key, Set<Value>>. 我正在尝试实现以下场景:

  1. 几个生产者更新此映射,将新值添加到已经存在的键或新键(在这种情况下会创建新的映射条目)。
  2. 消费者定期从映射中轮询一些有限数量的条目并将它们传递给处理器。

这是我的看法:

private static final MAX_UPDATES_PER_PASS = 100;

private final ConcurrentHashMap<Key, Set<Value>> updates = new ConcurrentHashMap<Key, Set<Value>>();

@Override
public void updatesReceived(Key key, Set<Value> values) {
    Set<Value> valuesSet = updates.get(key);
    if (valuesSet == null){
        valuesSet = Collections.newSetFromMap(new ConcurrentHashMap<Value, Boolean>());
        Set<Value> previousValues = updates.putIfAbsent(key, valuesSet);
        if (previousValues != null){
            valuesSet = previousValues;
        }
    }
    valuesSet.addAll(values);
}

private class UpdatesProcessor implements Runnable {

    @Override
    public void run() {
        int updatesProcessed = 0;
        Map<Key, Set<Value>> valuesToProcess = new HashMap<Key, Set<Value>>();
        Iterator<Map.Entry<Key, Set<Value>>> iterator = updates.entrySet().iterator();
        while(iterator.hasNext() && updatesProcessed < MAX_UPDATES_PER_PASS) {
            Map.Entry<Key, Set<Value>> next = iterator.next();
            iterator.remove(); // <-- here 
            Key key = next.getKey();
            Set<Value> values = valuesToProcess.get(key);
            if (values == null){
                values = new HashSet<Value>();
                valuesToProcess.put(key, values);
            }
            values.addAll(next.getValue());
            updatesProcessed++;
        }
        if (!valuesToProcess.isEmpty()){
            process(valuesToProcess);
        }
    }
}

该方法updatesRecevied()由来自任意线程的值的生产者调用。计划通过UpdatesProcessor定期执行ScheduledExecutorService,因此它也可以从任意线程调用。

每个值都应该只处理一次。不多不少。我不在乎一个值迟早会被处理,但最终它应该。

我希望它又快又猛,所以我不想把synchronize所有事情都搞砸。

这个带有迭代器的笨拙代码UpdatesProcessor服务于一个单一的目标,如果有类似ConcurrentHashMap.poll(). 但是没有。

所以,对于问题。首先,这是否保证有效?在我调用iterator.remove()该条目后,该条目将从地图中删除,并且每个附加值都会进入新条目的集合,对吗?

其次,我是不是把事情复杂化了?是否有一种通用的方法来处理这种场景(数据结构)?

4

0 回答 0