1

我正在尝试使用 JavaExecutorService及其功能编写一个程序invokeAll。我的问题是:该invokeAll功能是否同时解决了任务?我的意思是,如果我有两个处理器,会同时有两个工人吗?因为我无法使其正确缩放。如果我给newFixedThreadPool(2)或 1,完成问题需要相同的时间。

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
    tasks.add(new Map(ps, keyWords));
}
list = executor.invokeAll(tasks);

Map是一个实现Callable并且wp是部分解决方案的向量的类,该类在不同时间保存一些信息。

为什么不能缩放?可能是什么问题呢?

这是 PartialSolution 的代码:

import java.util.HashMap;
import java.util.Vector;

public class PartialSolution 
{
    public String fileName;//the name of a file
    public int b, e;//the index of begin and end of the fragment from the file
    public String info;//the fragment
    public HashMap<String, Word> hm;//here i retain the informations
    public HashMap<String, Vector<Word>> hmt;//this i use for the final reduce

    public PartialSolution(String name, int b, int e, String i, boolean ok)
    {
        this.fileName = name;
        this.b = b;
        this.e = e;
        this.info = i;
        hm = new HashMap<String, Word>();
        if(ok == true)
        {
            hmt = new HashMap<String, Vector<Word>>();
        }
        else
        {
             hmt = null;
        }    
    }
}

这是地图的代码:

public class Map implements Callable<PartialSolution>
{
    private PartialSolution ps;
    private Vector<String> keyWords;

    public Map(PartialSolution p, Vector<String> kw)
    {
        this.ps = p;
        this.keyWords = kw;
    }

    @Override
    public PartialSolution call() throws Exception 
    {
        String[] st = this.ps.info.split("\\n");
        for(int j = 0 ; j < st.length ; j++)
        {
            for(int i = 0 ; i < keyWords.size() ; i++)
            {
                if(keyWords.elementAt(i).charAt(0) != '\'')
                {
                    int k = 0;
                    int index = 0;
                    int count = 0;

                    while((index = st[j].indexOf(keyWords.elementAt(i), k)) != -1)
                    {
                        k = index + keyWords.elementAt(i).length();
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                } 
                else
                {
                    String regex = keyWords.elementAt(i).substring(1, keyWords.elementAt(i).length() - 1);
                    StringBuffer sb = new StringBuffer(regex);
                    regex = sb.toString();
                    Pattern pt = Pattern.compile(regex);
                    Matcher m = pt.matcher(st[j]);
                    int count = 0;
                    while(m.find())
                    {
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                }
            }
        }
        this.ps.info = null;
        return this.ps;
    }
}

所以在地图中,我从片段中取出每一行并搜索每个表达式的出现次数,我还保存了行数。在处理完所有片段后,在同一个 PartialSolution 中,我将信息保存在哈希图中并返回新的 PartialSolution。在下一步中,我将 PartialSolutions 与相同的文件名结合起来,并将它们引入一个 Callable 类 Reduce,它与 map 相同,不同之处在于它进行其他操作,但也返回一个 PartialSolution。

这是运行地图任务的代码:

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
   tasks.add(new Map(ps, keyWords));
}    
list = executor.invokeAll(tasks);

在任务中,我创建 Map 类型的任务,并在列表中获取它们。我不知道如何阅读 JVM 线程转储。我希望我给你的信息足够好。如果有帮助,我在 NetBeans 7.0.1 中工作。

谢谢你,亚历克斯

4

3 回答 3

2

我想知道的是方法invokeAll,如果我用10个线程创建了ExcutorService,会同时解决10个任务还是一次解决一个?

如果您将十个任务提交给具有十个线程的 ExecutorService,它将同时运行它们。它们是否可以完全并行并相互独立,取决于它们在做什么。但他们每个人都有自己的线程。

还有一个问题,如果我说 list.get(i).get() 这将在解决后返回 PartialSolution?

是的,它将阻塞直到计算完成(如果尚未完成)并返回其结果。

我真的不明白如果我使用 2 个线程而不是 1 个线程,为什么时间没有改善。

我们需要查看更多代码。他们是否在某些共享数据上同步?这些任务需要多长时间?如果它们很短,您可能不会注意到任何差异。如果它们需要更长的时间,请查看 JVM 线程转储以验证它们是否都在运行。

于 2011-11-29T09:46:50.010 回答
0

如果你用两个线程创建线程池,那么两个任务将同时运行。

我看到有两件事可能导致两个线程与一个线程花费相同的时间。

如果只有一个 Map 任务占用了您的大部分时间,那么额外的线程不会使该任务运行得更快。它不能比最慢的工作更快地完成。

另一种可能性是您的地图任务经常从共享向量中读取。这可能会导致足够的争用来抵消拥有两个线程的收益。

你应该在 jvisualvm 中提出这个来查看每个线程在做什么。

于 2011-11-29T22:07:15.093 回答
0

Java 8 在Executors 中引入了另外一种 API - newWorkStealingPool来创建工作窃取池。您不必创建RecursiveTaskRecursiveAction但仍然可以使用ForkJoinPool.

public static ExecutorService newWorkStealingPool()

创建一个工作窃取线程池,使用所有可用处理器作为其目标并行度级别。

默认情况下,它将以 CPU 内核数作为并行参数。如果你有核心 CPU,你可以有 8 个线程来处理工作任务队列。

Work stealing of idle worker threads from busy worker threads improves overall performance. Since task queue is unbounded in nature, this ForkJoinPool is recommended for the tasks executing in short time intervals.

如果您没有共享数据和共享锁定(同步)以及线程间通信,那么ExecutorService 或 ForkJoinPoolThreadPoolExecutor 性能会 很好。如果任务队列中的所有任务相互独立,性能会有所提高。

ThreadPoolExecutor构造函数来自定义和控制任务的工作流程:

 ThreadPoolExecutor(int corePoolSize, 
                       int maximumPoolSize, 
                       long keepAliveTime, 
                       TimeUnit unit, 
                       BlockingQueue<Runnable> workQueue, 
                       ThreadFactory threadFactory,
                       RejectedExecutionHandler handler)

查看相关的 SE 问题:

如何正确使用 Java Executor?

Java 的 Fork/Join vs ExecutorService - 什么时候使用?

于 2016-01-30T03:36:32.487 回答