我有一个多级管道。每个阶段都在一个单独的线程中运行,并使用有界 BlockingArrayQueues 进行通信。我正在尝试对最慢的阶段进行多线程处理以提高吞吐量。
问题:为此推荐的实现是什么?是否有一个库可以使这个实现更简单、更容易阅读?
输入队列 -> 阶段 1(4 个线程) -> 有界队列 -> 阶段 2
要求:
输入队列上的工作单元是独立的。
工作单元是严格排序的——输出应该与输入的顺序相同。
第 1 阶段必须受到限制——如果输出超过一定大小,它必须停止。
阶段 1 中的异常应导致输出队列上出现“毒丸”并终止 ExecutorService。应尽最大努力丢弃排队的任务。
** 我建议的实现:**
我正在考虑使用线程数有限的 ThreadPoolExecutor。
将通过每个工作单元上的 CountDown 闩锁强制执行严格排序。如果前一个工作单元的锁存器为 0 并且队列上有空间,则线程只能推送结果。这也负责限制,因为线程将阻塞,直到输出队列上有空间。
class WorkUnit {
CountDownLatch previousLatch;
CountDownLatch myLatch;
}
class MyRunnable extends Runnable {
public void run() {
//do work...
previousLatch.await();
ouputQueue.put( result );
myLatch.countDown();
}
}
异常处理是我有点难过的地方。我正在考虑覆盖 ThreadPoolExecutor.afterExecute() 如果出现异常,它将调用 shutdownNow()。
class MyThreadPoolExecutor extends ThreadPoolExecutor {
protected void afterExecute(Runnable r, Throwable t) {
if(t != null) {
//record exection, log, alert, etc
ouput.put(POISON_PILL);
shutdownNow();
}
}
}