3

所以我想我有点理解固定线程池是如何工作的(使用 Java 内置的 Executor.fixedThreadPool),但据我所知,通常有一定数量的工作你想要完成,当你启动时你知道有多少工作程序。例如

int numWorkers = Integer.parseInt(args[0]);
int threadPoolSize = Integer.parseInt(args[1]);
ExecutorService tpes =
    Executors.newFixedThreadPool(threadPoolSize);
WorkerThread[] workers = new WorkerThread[numWorkers];
for (int i = 0; i < numWorkers; i++) {
    workers[i] = new WorkerThread(i);
    tpes.execute(workers[i]);
}

每个 workerThread 做一些非常简单的事情,那部分是任意的。我想知道的是,如果你有一个固定的池大小(比如最大 8 个),但你不知道在运行时之前完成任务需要多少工作人员。

具体示例是:如果我的池大小为 8,并且我正在从标准输入读取数据。当我阅读时,我将输入分成设定大小的块。这些块中的每一个都被提供给一个线程(以及一些其他信息),以便它们可以对其进行压缩。因此,我不知道我需要创建多少个线程,因为我需要继续进行,直到我到达输入的末尾。我还必须以某种方式确保数据保持相同的顺序。如果线程 2 在线程 1 之前完成并且只是提交它的工作,我的数据将是乱序的!

那么在这种情况下线程池会是错误的方法吗?看起来会很棒(因为我一次不能使用超过 8 个线程)。

基本上,我想做这样的事情:

ExecutorService tpes = Executors.newFixedThreadPool(threadPoolSize);
BufferedInputStream inBytes = new BufferedInputStream(System.in);
byte[] buff = new byte[BLOCK_SIZE];
byte[] dict = new byte[DICT_SIZE];
WorkerThread worker;
int bytesRead = 0;

while((bytesRead = inBytes.read(buff)) != -1) {
   System.arraycopy(buff, BLOCK_SIZE-DICT_SIZE, dict, 0, DICT_SIZE);
   worker = new WorkerThread(buff, dict)   
   tpes.execute(worker);
}

这不是工作代码,我知道,但我只是想说明我想要什么。

我遗漏了一点,但是看看 buff 和 dict 的值是如何变化的,我不知道输入有多长。我认为我实际上不能这样做,因为在第一次通话后,工人已经存在了!我不能只说 worker = new WorkerThread 很多时间,因为它不是已经指向一个现有线程(真的,一个可能已经死的线程)并且显然在这个实现中,如果它确实有效,我就不会运行在平行下。但我的意思是,我想继续创建线程,直到达到最大池大小,等到一个线程完成,然后继续创建线程,直到我达到输入的结尾。

我还需要整理东西,这是真正令人讨厌的部分。

4

2 回答 2

1

WorkerThread您的解决方案完全没问题(唯一的一点是,如果您的 s 的工作量非常小,则可能不需要并行性)。

使用线程池,提交任务的数量无关紧要。池中的线程数可能少于或多于,线程池负责处理。

但是,这很重要:您依赖于 s 结果的某种顺序WorkerThread,但是在使用并行性时,不能保证这种顺序!不管你是否使用线程池,或者你有多少工作线程等等,你的结果总是有可能以任意顺序完成!

要保持顺序正确,请WorkerThread在其构造函数中为每个项指定当前项的编号,并让它们在完成后将结果按正确的顺序排列:

int noOfWorkItem = 0;
while((bytesRead = inBytes.read(buff)) != -1) {
   System.arraycopy(buff, BLOCK_SIZE-DICT_SIZE, dict, 0, DICT_SIZE);
   worker = new WorkerThread(buff, dict, noOfWorkItem++)   
   tpes.execute(worker);
}
于 2012-10-28T16:37:10.910 回答
1

正如@ignis 指出的那样,并行执行可能不是您情况的最佳答案。但是,要回答更一般的问题,除了 FixedThreadPool 之外,还有其他几个 Executor 实现需要考虑,其中一些可能具有您想要的特性。

至于保持秩序,通常您会将任务提交给执行者,并且对于每次提交,您都会获得一个 Future(这是一个承诺稍后在任务完成时为您提供结果的对象)。因此,您可以按照提交任务的顺序跟踪 Futures,然后当所有任务完成后,按顺序调用每个 Futures 上的 get() 以获取结果。

于 2012-10-28T16:29:49.987 回答