11

我正在努力寻找实现我的处理管道的最佳方式。

我的生产者将工作提供给 BlockingQueue。在消费者方面,我轮询队列,将我得到的内容包装在一个 Runnable 任务中,然后将其提交给一个 ExecutorService。

while (!isStopping())
{
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
    if (work == null)
    {
        break;
    }
    executorService.execute(new Worker(work));   // needs to block if no threads!
}

这并不理想;当然,ExecutorService 有自己的队列,所以真正发生的是我总是完全排空我的工作队列并填充任务队列,随着任务完成,任务队列会慢慢清空。

我意识到我可以在生产者端排队任务,但我真的不想那样做——我喜欢我的工作队列的间接/隔离是哑字符串;这真的与制片人无关,他们会发生什么。强制生产者对 Runnable 或 Callable 进行排队会破坏抽象,恕我直言。

但我确实希望共享工作队列代表当前的处理状态。如果消费者跟不上,我希望能够阻止生产者。

我很想使用 Executors,但我觉得我在与他们的设计作斗争。我可以喝部分 Kool-ade,还是必须一饮而尽?我在抵制排队任务方面是不是走错路了?(我怀疑我可以设置 ThreadPoolExecutor 以使用 1 任务队列并覆盖它的执行方法来阻止而不是拒绝队列满,但这感觉很糟糕。)

建议?

4

3 回答 3

22

我希望共享工作队列代表当前的处理状态。

尝试使用共享的BlockingQueue 并让 Worker 线程池将工作项从队列中取出。

如果消费者跟不上,我希望能够阻止生产者。

ArrayBlockingQueueLinkedBlockingQueue都支持有界队列,这样它们会在满时阻塞。使用阻塞put()方法可确保在队列已满时阻塞生产者。

这是一个艰难的开始。您可以调整工作人员的数量和队列大小:

public class WorkerTest<T> {

    private final BlockingQueue<T> workQueue;
    private final ExecutorService service;

    public WorkerTest(int numWorkers, int workQueueSize) {
        workQueue = new LinkedBlockingQueue<T>(workQueueSize);
        service = Executors.newFixedThreadPool(numWorkers);

        for (int i=0; i < numWorkers; i++) {
            service.submit(new Worker<T>(workQueue));
        }
    }

    public void produce(T item) {
        try {
            workQueue.put(item);
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }


    private static class Worker<T> implements Runnable {
        private final BlockingQueue<T> workQueue;

        public Worker(BlockingQueue<T> workQueue) {
            this.workQueue = workQueue;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    T item = workQueue.take();
                    // Process item
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}
于 2010-02-10T01:42:07.890 回答
1

“如果存在,则找到可用的现有工作线程,必要时创建一个,如果它们空闲则杀死它们。”

管理所有这些工人状态既不必要又危险。我会创建一个在后台持续运行的监控线程,唯一的任务就是填满队列并产生消费者......为什么不让工作线程守护进程,这样它们一完成就死掉?如果将它们全部附加到一个 ThreadGroup 中,则可以动态调整池的大小...例如:

  **for(int i=0; i<queue.size()&&ThreadGroup.activeCount()<UPPER_LIMIT;i++ { 
         spawnDaemonWorkers(queue.poll());
  }**
于 2010-10-25T17:53:42.917 回答
0

您可以让您的消费者Runnable::run直接执行,而不是启动一个新线程。将此与最大大小的阻塞队列结合起来,我认为您将得到您想要的。您的消费者成为一名工作人员,根据队列中的工作项内联执行任务。他们只会在处理项目时尽快出列项目,以便您的生产者在您的消费者停止消费时。

于 2010-02-10T01:56:49.397 回答