2

我有一个无限的作业队列,可以异步处理。每个作业的处理可能会或可能不会触发为此队列创建新作业。

我想要一个由多个工作线程组成的池来从这个队列中获取项目并并行处理它们,直到队列都为空并且所有工作线程都空闲等待队列上的新作业(因为忙碌的工作人员最终可能会添加新的作业到队列中)。

是否有使用java.util.concurrent实现的方法可以用来解决这个特定问题,其中工人也是生产者?尚不清楚 API 是否以直接的方式支持这种情况。

特别是,我希望能够检测到终止条件,即当没有更多作业可用(空作业队列)并且不会再产生作业(所有空闲工作线程)时。

编辑

Nam San 下面的回答似乎是最优雅的方法,它基本上归结为跟踪提交的作业数量与已完成作业的数量,并使用这些数字相等的情况作为终止条件。

我已经实现了一个完整的例子,使用java.util.concurrent扩展ThreadPoolExecutor来实现这一点,加上专门的作业队列来接受Comparable以特定方式排序的实例。

  • TestExecutor.java:一个自定义执行器,它扩展ThreadPoolExecutor但具有执行可能创建新作业的作业的附加方法,以及一个等待所有提交作业完成的新 await 方法。
  • WorkUnit.java:一个可比较的可运行作业的示例,它可以创建新的作业以提交给TestExecutor.
  • Test.java:包含一个 main 方法来运行一个使用WorkUnit带有TestExecutor.
4

5 回答 5

1

我认为消费者也是生产者并不重要,因为在生产者-消费者模式中,它们是完全独立的关注点。

您的消费者已经拥有对队列的引用 - 只需让他们像生产者一样添加到队列中。

您可以使用 aAtomicInteger或类似的方法来记录当前有多少工作人员处于活动状态,或者CountDownLatch如果您想等到他们全部静止,请使用 a。

于 2012-10-22T06:40:58.860 回答
1

对于这类问题,我已经看到了一些不同的解决方案。

一种是仍然poll在主线程中用作阻塞调用,就像在您的代码中一样,但是在可能永远等待的情况下将来自工作人员的“虚拟”对象排入队列以唤醒主线程。例如,任何在没有向队列添加更多项目的情况下完成的工作人员应该提交一个虚拟作业,主线程识别并忽略它(它仅用于唤醒主线程)。通过跟踪活动作业的数量,您可以创建更少的虚拟对象,从而减少“虚假唤醒”,从而减少“虚假唤醒”——只有最后一个作业需要添加虚拟对象。

另一种方法是等待不同的对象。例如,任何老人Object都可以。wait()在这个对象上有主线程。Object.notify()然后作业使用它们完成的任何时间唤醒这个线程。同样,通过计数,您可以减少所需通知的数量。

最优雅的解决方案可能是使用Semaphore. 基本上,信号量的值将是“飞行作业+队列项目”数量的负数。当一个作业从队列中取出一个项目时,这个值不会改变(因为飞行中的作业增加一个,而队列项目减少一个),但是每个作业都应该为他们添加的每个作业调用 reducePermits(),并且在他们完成之前进行一次 release() 调用。

然后主线程可以acquire()在工作期间阻塞。当它醒来时,一切都完成了(因为在飞行中+排队的工作为零)。您将启动另一个线程来实际执行调用并添加作业(目前由主线程完成),并且当主线程返回poll时可以关闭该工作人员。但是,让现有的工人自己而不是完成acquire可能更简单。poll()那么你根本不需要这个传递函数。

事实上,有了Semaphore解决方案,为什么不完全放弃队列,而使用内置在执行器中的队列呢?也就是说,工人是否通过 ? 提交了新工作executor.submit(newJob(nextJob))?无论如何,执行器线程在内部都在从阻塞队列中提取工作,因此在具有显式外部队列方面存在一些重复。

于 2012-10-24T07:27:26.550 回答
1

下面的代码演示了如何使用包装类Executor来计算提交的作业数并将其与已完成的作业数进行比较以实现您想要的结果。请注意,您的任务必须调用execute包装类的方法,并且永远不要Executor直接调用底层。如果需要,扩展下面的包装器以包装 an 的“提交”方法应该是微不足道的ExecutorService

public class ExampleExecutor {

    private final Executor executor;
    private long submitCount = 0;
    private long doneCount = 0;

    public ExampleExecutor(Executor executor) {
        this.executor = executor;
    }

    public synchronized void execute(Collection<Runnable> commands) {
        for (Runnable command : commands) {
            execute(command);
        }
    }

    public synchronized void execute(final Runnable command) {
        submitCount ++;

        executor.execute(new Runnable() {
            public void run() {
                try {
                    command.run();
                } finally {
                    synchronized (ExampleExecutor.this) {
                        doneCount++;
                        if (doneCount == submitCount) {
                            ExampleExecutor.this.notifyAll();
                        }
                    }
                }
            }
        });
    }

    public synchronized void awaitCompletion() throws InterruptedException {
        while (doneCount != submitCount) {
            this.wait();
        }
    }
}

编辑:在下面添加了测试用例来演示如何使用上述代码

public class Test {

    static class Task implements Runnable {
        private final String id;
        private final long repetitions;
        private final long respawnSize;
        private final ExampleExecutor executor;

        public Task(String id, long repetitions, long respawnSize, ExampleExecutor executor) {
            this.id = id;
            this.repetitions = repetitions;
            this.respawnSize = respawnSize;
            this.executor = executor;
        }

        public void run() {
            for (int i = 0; i < respawnSize; i ++) {
                // Spawning new sub tasks
                executor.execute(new Task(id + "-" + i, repetitions/2, 0, null));
            }

            double sum = 0;
            for (int i = 0; i < repetitions; i++) {
                sum += Math.sin(i);
            }

            System.err.println(id + " completed at " + System.currentTimeMillis() + " with sum=" + sum);
        }
    }

    public static void main(String argv[]) throws InterruptedException {
        ExampleExecutor executor = new ExampleExecutor(Executors.newFixedThreadPool(2));
        executor.execute(new Task("0", 2000000, 100, executor));

        System.err.println("main thread awaits completion");
        executor.awaitCompletion();
        System.err.println("main thread recieved completion event");
    }
}
于 2012-10-28T09:08:58.030 回答
1

请参阅我在Directory Scanner上的帖子它可以满足大多数要求。但它没有用 Futures 和 Callable 实现。得想一想。每个任务都没有被赋予重要性。没有结果并且产生异常。它只是一种扫描文件的并行和递归方式。

于 2012-10-27T04:18:11.427 回答
1

几年前,我不得不做一些类似的事情,但有界的堆栈。我将分享一个可能的解决方案:

idle_thread = MAX_THREAD;
do
{
    if(queue != empty) // If thread have work to do
    {
       idle_threads--;  // Count this threads was a worker   
       flag = true;
       while(queue != empty)  // Until queue have work
       {
          synchronized(this)
          {
            // task =  take_out_of_queue;
          }
        }
   }
   if(flag) // This flag must to be local to each thread, it is use to insure 
   {        // that threads will count this only one time for each time 
          // the queue got empty
         synchronized(this)
         {
            if(flag == false)
            idle_threads++;  // Count thread as a idle one
            flag = false;
         }
     }
     if(idle_threads == MAX_THREADS) out = true; // When all threads are idle stop the work loop
} while(!out)
于 2012-10-26T22:25:14.237 回答