2

我正在尝试创建一个RecursiveTask<Map<Short, Long>>

我使用这篇文章作为参考

public class SearchTask2 extends RecursiveTask<Map<Short, Long>> {

    private final int majorDataThreshold = 16000;

    private ConcurrentNavigableMap<Short, Long> dataMap;
    private long fromRange;
    private long toRange;
    private boolean fromInclusive;
    private boolean toInclusive;

    public SearchTask2(final Map<Short, Long> dataSource, final long fromRange, final long toRange,
            final boolean fromInclusive, final boolean toInclusive) {
        System.out.println("SearchTask ::  ");
        this.dataMap = new ConcurrentSkipListMap<>(dataSource);
        this.fromRange = fromRange;
        this.toRange = toRange;
        this.fromInclusive = fromInclusive;
        this.toInclusive = toInclusive;
    }

    @Override
    protected Map<Short, Long> compute() {
        System.out.println("SearchTask :: compute ");
        //Map<Short, Long> result = new HashMap<>();
        int size = dataMap.size();
        if (size > majorDataThreshold + 2500) {
             return ForkJoinTask.invokeAll(createSubtasks()).parallelStream().map(ForkJoinTask::join)
             .collect(Collectors.toConcurrentMap(keyMapper, valueMapper));

            //.forEach(entry -> result.put( entry.getKey(), (Long) entry.getValue()));  
        } 
        return  search();
    }

    private List<SearchTask2> createSubtasks() {
        final short lastKey = dataMap.lastKey();
        final short midkey = (short) (lastKey / 2);
        final short firstKey = dataMap.firstKey();
        final List<SearchTask2> dividedTasks = new ArrayList<>();
        dividedTasks.add(new SearchTask2(new HashMap<>(dataMap.subMap(firstKey, true, midkey, false)), fromRange,
                toRange, fromInclusive, toInclusive));
        dividedTasks.add(new SearchTask2(new HashMap<>(dataMap.subMap(midkey, true, lastKey, true)), fromRange, toRange,
                fromInclusive, toInclusive));
        return dividedTasks;
    }

    private HashMap<Short,Long> search(){
        //My Search logic for values
        return new HashMap<>();
    }
}

有人可以帮助我keyMapper并为我的结果地图“valueMapper”吗,我试过了Collectors.toConcurrentMap(entry -> entry.getKey(), entry -> entry.getValue())

但它向我显示了一个错误

Cannot infer type argument(s) for <R, A> collect(Collector<? super T,A,R>)
4

1 回答 1

2

ForkJoinTask::join正在返回一张地图,所以你有一个地图流。您似乎期待一连串条目。您可以使用flatMap从映射流获取条目流,如下所示:

return ForkJoinTask.invokeAll(createSubtasks())
    .parallelStream()
    .map(ForkJoinTask::join)
    .flatMap(map -> map.entrySet().stream())   // you were missing this line
    .collect(
        Collectors.toConcurrentMap(entry -> entry.getKey(), entry -> entry.getValue())
    );

作为一个小小的改进,您还可以使用方法引用而不是您尝试的 lambda 表达式:

Collectors.toConcurrentMap(Entry::getKey, Entry::getValue)
于 2018-04-04T08:59:52.257 回答