1

我有一个多级管道。每个阶段都在一个单独的线程中运行,并使用有界 BlockingArrayQueues 进行通信。我正在尝试对最慢的阶段进行多线程处理以提高吞吐量。

问题:为此推荐的实现是什么?是否有一个库可以使这个实现更简单、更容易阅读?

输入队列 -> 阶段 1(4 个线程) -> 有界队列 -> 阶段 2

要求

  • 输入队列上的工作单元是独立的。

  • 工作单元是严格排序的——输出应该与输入的顺序相同。

  • 第 1 阶段必须受到限制——如果输出超过一定大小,它必须停止。

  • 阶段 1 中的异常应导致输出队列上出现“毒丸”并终止 ExecutorService。应尽最大努力丢弃排队的任务。

** 我建议的实现:**

我正在考虑使用线程数有限的 ThreadPoolExecutor。

将通过每个工作单元上的 CountDown 闩锁强制执行严格排序。如果前一个工作单元的锁存器为 0 并且队列上有空间,则线程只能推送结果。这也负责限制,因为线程将阻塞,直到输出队列上有空间。

class WorkUnit {
   CountDownLatch previousLatch;
   CountDownLatch myLatch;
}

class MyRunnable extends Runnable {
   public void run() {
       //do work...
       previousLatch.await();
       ouputQueue.put( result );
       myLatch.countDown();
   }
}

异常处理是我有点难过的地方。我正在考虑覆盖 ThreadPoolExecutor.afterExecute() 如果出现异常,它将调用 shutdownNow()。

class MyThreadPoolExecutor extends ThreadPoolExecutor {
      protected void afterExecute(Runnable r, Throwable t) {
           if(t != null) {
               //record exection, log, alert, etc
               ouput.put(POISON_PILL);
               shutdownNow();
           }
      }
}
4

1 回答 1

0

使用 ExecutorService 需要完全异步的设计,而不使用像CountDownLatch.countDown()or这样的阻塞操作BlockingQueue.take()。当动作 B 必须等待某些事件时,该事件应通过将 aRunnable提交给ExecutorService.

在您的情况下,您应该创建自定义类而不是队列。根据某些规则(例如,限制正在运行的 Stage1 任务的数量),这些类应该接受消息并将它们存储在内部,或者提交实现 Stage1 或 Stage2 的任务。

至于订购,请用序列号代替先前任务的参考。

于 2013-01-12T09:25:49.060 回答