0

我设计了一个RecursiveTask

这是我设计的任务的代码。

public class SearchTask extends RecursiveTask<Map<Short, Long>> {

private static final long serialVersionUID = 1L;
private int majorDataThreshold = 16001;
private ConcurrentNavigableMap<Short, Long> dataMap;
private long fromRange;
private long toRange;
private boolean fromInclusive;
private boolean toInclusive;

public SearchTask(final Map<Short, Long> dataSource, final long fromRange, final long toRange,
        final boolean fromInclusive, final boolean toInclusive) {
    this.dataMap = new ConcurrentSkipListMap<>(dataSource);
    this.fromRange = fromRange;
    this.toRange = toRange;
    this.fromInclusive = fromInclusive;
    this.toInclusive = toInclusive;
}

@Override
protected Map<Short, Long> compute() {
    final int size = dataMap.size();
    // This is not a perfect RecursiveTask, because the if condition is designed to overcome a stackoverflow error when map filled with 32k data
    if (size > majorDataThreshold+1000) {
        // List<SearchTask> tasks = createSubtasks();
        // tasks.get(0).fork();
        // tasks.get(1).fork();

        // Map<Short, Long> map = new ConcurrentHashMap<>(tasks.get(0).join());
        // map.putAll(tasks.get(1).join());
        // return map;

        return ForkJoinTask.invokeAll(createSubtasks()).stream().map(ForkJoinTask::join)
                .flatMap(map -> map.entrySet().stream())
                .collect(Collectors.toConcurrentMap(Entry::getKey, Entry::getValue));
    }
    return search();
}

private List<SearchTask> createSubtasks() {
    final short lastKey = dataMap.lastKey();
    final short midkey = (short) (lastKey / 2);
    final short firstKey = dataMap.firstKey();
    final List<SearchTask> dividedTasks = new ArrayList<>();
    dividedTasks.add(
            new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(firstKey, true, midkey, false)),
                    fromRange, toRange, fromInclusive, toInclusive));
    dividedTasks
            .add(new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(midkey, true, lastKey, true)),
                    fromRange, toRange, fromInclusive, toInclusive));
    return dividedTasks;
}

private Map<Short, Long> search() {
    final Map<Short, Long> result = dataMap.entrySet().stream()
            .filter(serchPredicate(fromRange, toRange, fromInclusive, toInclusive))
            .collect(Collectors.toConcurrentMap(p -> p.getKey(), p -> p.getValue()));
    return result;
}

private static Predicate<? super Entry<Short, Long>> serchPredicate(final long fromValue, final long toValue,
        final boolean fromInclusive, final boolean toInclusive) {
    if (fromInclusive && !toInclusive)
        return p -> (p.getValue() >= fromValue && p.getValue() < toValue);
    else if (!fromInclusive && toInclusive)
        return p -> (p.getValue() > fromValue && p.getValue() <= toValue);
    else if (fromInclusive && toInclusive)
        return p -> (p.getValue() >= fromValue && p.getValue() <= toValue);
    else
        return p -> (p.getValue() > fromValue && p.getValue() < toValue);
}

此任务处理的最大数据为 32000 (32k)

在代码中,如果它通过阈值,我将拆分任务

 if (size > majorDataThreshold)

当我尝试将majorDataThreshold 降低到小于16001 值时,我收到了一个错误

堆栈跟踪

at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinPool.helpStealer(Unknown Source)
at java.util.concurrent.ForkJoinPool.awaitJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.doJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
...........................Same trace
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
Caused by: java.lang.StackOverflowError
    ... 1024 more
Caused by: java.lang.StackOverflowError
    ... 1024 more
    .................Same trace
Caused by: java.lang.StackOverflowError
    at java.util.Collection.stream(Unknown Source)
    at com.ed.search.framework.forkjoin.SearchTask.search(SearchTask.java:74)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:56)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
    at java.util.concurrent.RecursiveTask.exec(Unknown Source)
    at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
    at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
    at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)

为了解决这个问题,我尝试使用

Collectors.toMap()
ConcurrentHashMap
Join Manually

问题仍然没有得到解决

有人可以帮我找出我的RecursiveTask任务有什么问题吗?

单元测试代码

public class Container32kUniqueDataTest {

private ForkJoinRangeContainer forkJoinContianer;

@Before
public void setUp(){
    long[] data = genrateTestData();
    forkJoinContianer = new ForkJoinRangeContainer(data)
}

private long[] genrateTestData(){
    long[] data= new long[32000];
    for (int i = 0; i < 32000; i++) {
        data[i]=i+1;
    }
    return data;
}

@Test
public void runARangeQuery_forkJoin(){
    Set<Short> ids = forkJoinContianer.findIdsInRange(14, 17, true, true);
    assertEquals(true, ids.size()>0);
}
}   

容器代码的略读版本

public class ForkJoinRangeContainer {

private Map<Short, Long> dataSource = new HashMap<Short, Long>();

public ForkJoinRangeContainer(long[] data) {
    populateData(data);
}

private void populateData(final long[] data) {
    for (short i = 0; i < data.length; i++) {
        dataSource.put(i, data[i]);
    }
}

public Set<Short> findIdsInRange(final long fromValue, long toValue, boolean fromInclusive, boolean toInclusive) {
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    SearchTask task = new SearchTask(dataSource, fromValue, toValue, fromInclusive, toInclusive);
    Map<Short, Long> map = forkJoinPool.invoke(task);
    forkJoinPool.shutdown();
    return map.keySet();
}

public static void main(String[] args) {

    long[] data = new long[32000];
    for (int i = 0; i < 32000; i++) {
        data[i] = i + 1;
    }
    ForkJoinRangeContainer rf2 = new ForkJoinRangeContainer(data);
    Set<Short> ids = rf2.findIdsInRange(14, 17, true, true);
    if (ids.size() > 0) {
        System.out.println("Found Ids");
    }
}
4

1 回答 1

0

您在 SearchTask return ForkJoinTask.invokeAll(createSubtasks()) 陷入了一个永无止境的循环

createSubtasks() 使用相同的值一遍又一遍地创建子任务,因为您永远不会减少 dataMap 的大小。

F/J 通过将对象拆分为左和右来工作。每个 Left 和 Right 都会创建新的 Left 和 Right,其值的一半。这种减半一直持续到你“完成工作”的门槛。</p>

我在编程中学到的第一课就是保持简单。

您正在混合使用 Map、ArrayMap、ConcurrentSkipListMap、ConcurrentNavigableMap、List、stream.Collectors、HashMap 和 Set 以及 F/J 类。最令人困惑的,这使得它很难遵循,通常会导致失败。简单更好。

当您为 ForkJoinTask.invokeAll() 创建一个 List 时,请在 invoke() 之前一次创建该 List。列表应该包含完成工作所需的所有子任务,每个子任务的价值是前一个的一半。不要使用流;您没有流,只有列表中的几个子任务。

要么,要么拆分左右,然后执行 Left.fork() Right.fork()。然后每个分叉的任务再次拆分一半的值,等等。

究竟如何减少对象数据映射“要拆分的大小”取决于您。

于 2018-04-06T17:00:23.413 回答