我正在努力寻找实现我的处理管道的最佳方式。
我的生产者将工作提供给 BlockingQueue。在消费者方面,我轮询队列,将我得到的内容包装在一个 Runnable 任务中,然后将其提交给一个 ExecutorService。
while (!isStopping())
{
String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (work == null)
{
break;
}
executorService.execute(new Worker(work)); // needs to block if no threads!
}
这并不理想;当然,ExecutorService 有自己的队列,所以真正发生的是我总是完全排空我的工作队列并填充任务队列,随着任务完成,任务队列会慢慢清空。
我意识到我可以在生产者端排队任务,但我真的不想那样做——我喜欢我的工作队列的间接/隔离是哑字符串;这真的与制片人无关,他们会发生什么。强制生产者对 Runnable 或 Callable 进行排队会破坏抽象,恕我直言。
但我确实希望共享工作队列代表当前的处理状态。如果消费者跟不上,我希望能够阻止生产者。
我很想使用 Executors,但我觉得我在与他们的设计作斗争。我可以喝部分 Kool-ade,还是必须一饮而尽?我在抵制排队任务方面是不是走错路了?(我怀疑我可以设置 ThreadPoolExecutor 以使用 1 任务队列并覆盖它的执行方法来阻止而不是拒绝队列满,但这感觉很糟糕。)
建议?