6

当我运行以下代码时,8 个可用线程中只有 2 个运行,谁能解释为什么会这样?如何更改代码以利用所有 8 个线程?

Tree.java

package il.co.roy;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

public class Tree<T>
{
    private final T data;
    private final Set<Tree<T>> subTrees;

    public Tree(T data, Set<Tree<T>> subTrees)
    {
        this.data = data;
        this.subTrees = subTrees;
    }

    public Tree(T data)
    {
        this(data, new HashSet<>());
    }

    public Tree()
    {
        this(null);
    }

    public T getData()
    {
        return data;
    }

    public Set<Tree<T>> getSubTrees()
    {
        return subTrees;
    }

    @Override
    public boolean equals(Object o)
    {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        Tree<?> tree = (Tree<?>) o;
        return Objects.equals(data, tree.data) &&
                Objects.equals(subTrees, tree.subTrees);
    }

    @Override
    public int hashCode()
    {
        return Objects.hash(data, subTrees);
    }

    @Override
    public String toString()
    {
        return "Tree{" +
                "data=" + data +
                ", subTrees=" + subTrees +
                '}';
    }

    public void sendCommandAll()
    {
        if (data != null)
            System.out.println("[" + Thread.currentThread().getName() + "] sending command to " + data);
        try
        {
            Thread.sleep(5000);
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        if (data != null)
            System.out.println("[" + Thread.currentThread().getName() + "] tree with data " + data + " got " + true);
        subTrees.parallelStream()
//              .map(Tree::sendCommandAll)
                .forEach(Tree::sendCommandAll);
//              .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2);
    }
}

forEach(我使用or没关系reduce)。

Main.java

package il.co.roy;

import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Main
{
    public static void main(String... args)
    {
        System.out.println("Processors: " + Runtime.getRuntime().availableProcessors());


        final Tree<Integer> root = new Tree<>(null,
                Set.of(new Tree<>(1,
                        IntStream.range(2, 7)
                                        .boxed()
                                        .map(Tree::new)
                                        .collect(Collectors.toSet()))));

        root.sendCommandAll();

//      IntStream.generate(() -> 1)
//              .parallel()
//              .forEach(i ->
//              {
//                  System.out.println(Thread.currentThread().getName());
//                  try
//                  {
//                      Thread.sleep(5000);
//                  } catch (InterruptedException e)
//                  {
//                      e.printStackTrace();
//                  }
//              });
    }
}

在该main方法中,我创建了一个具有以下结构的树:\

root (data is `null`)
  |- 1
     |- 2
     |- 3
     |- 4
     |- 5
     |- 6

sendCommandAll函数处理每个子树(并行)只有当它的父完成处理时。但结果如下:

处理器:8
[main] 向 1
[main] 树发送命令,数据 1 为真
[main] 向 6
[ForkJoinPool.commonPool-worker-2] 向 5
[main] 树发送命令,数据 6 为真
[ForkJoinPool .commonPool-worker-2] 数据 5 的树为真
[ForkJoinPool.commonPool-worker-2] 向 4 发送命令
[ForkJoinPool.commonPool-worker-2] 数据 4 的树为真
[ForkJoinPool.commonPool-worker-2]向 3
[ForkJoinPool.commonPool-worker-2] 树发送命令,数据 3 为真
[ForkJoinPool.commonPool-worker-2] 向 2
[ForkJoinPool.commonPool-worker-2] 树发送命令,数据 2 为真

(作为记录,当我执行注释代码时Main.java,JVM 使用所有 7 (+ 1) 个可用线程commonPool

如何改进我的代码?

4

1 回答 1

4

如本答案后半部分所述,处理HashMaps 或s时的线程利用率HashSet取决于后备数组中元素的分布,这取决于哈希码。尤其是元素数量较少的情况下,与(默认)容量相比,这可能会导致糟糕的工作拆分。

一个简单的解决方法是使用new ArrayList<>(subTrees).parallelStream()而不是subTrees.parallelStream().

但请注意,您的方法在处理子节点之前执行当前节点的实际工作(在用 a 模拟的示例中sleep),这也降低了潜在的并行性。

您可以使用

public void sendCommandAll() {
    if(subTrees.isEmpty()) {
        actualSendCommand();
        return;
    }
    List<Tree<T>> tmp = new ArrayList<>(subTrees.size() + 1);
    tmp.addAll(subTrees);
    tmp.add(this);
    tmp.parallelStream().forEach(t -> {
        if(t != this) t.sendCommandAll(); else t.actualSendCommand();
    });
}

private void actualSendCommand() {
    if (data != null)
        System.out.println("[" + Thread.currentThread().getName()
                         + "] sending command to " + data);
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    if (data != null)
        System.out.println("[" + Thread.currentThread().getName()
                         + "] tree with data " + data + " got " + true);
}

这允许在处理子节点的同时处理当前节点。

于 2021-10-13T15:35:32.610 回答