12

我正在从队列服务器获取数据,我需要对其进行处理并发送确认。像这样的东西:

while (true) {
    queueserver.get.data
    ThreadPoolExecutor //send data to thread
    queueserver.acknowledgement 

我不完全理解线程中发生了什么,但我认为该程序获取数据,将其发送到线程,然后立即确认。因此,即使我限制每个队列只能有 200 个未确认的项目,它也会尽可能快地接收它。当我在单个服务器上编写程序时这很好,但是如果我使用多个工作人员,那么这就会成为一个问题,因为线程队列中的项目数量并不是它完成的工作的反映,而是它的速度有多快可以从队列服务器获取项目。

如果线程队列满了工作,我能做些什么来让程序等待吗?

4

4 回答 4

27

如果需要处理的数据太多,如何让 ThreadPoolExecutor 命令等待?

BlockingQueue您可以使用带有限制的a 而不是开放式队列:

BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200);

就提交给 的作业而言,您可以创建自己的ExecutorService,而不是使用使用无界队列创建的默认ExecutorServices :Executors

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
              new ArrayBlockingQueue<Runnable>(200));

一旦队列填满,它将导致它拒绝任何提交的新任务。您将需要设置一个RejectedExecutionHandler提交到队列的。就像是:

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
           0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
      // check afterwards and throw if pool shutdown
      if (executor.isShutdown()) {
         throw new RejectedExecutionException(
              "Task " + r + " rejected from " + e);
      }
   }
});

我认为 Java 没有ThreadPoolExecutor.CallerBlocksPolicy.

于 2012-04-27T15:15:23.007 回答
3

如果您想要在工作人员开始处理任务时得到确认,您可以自定义ThreadFactory在执行实际工作之前从线程发送确认。或者您可以覆盖beforeExecute.ThreadPoolExecutor

如果您希望在新工作人员被释放以执行新任务时得到确认,我认为您可以ThreadPoolExecutor使用 aSynchronousQueue和 a初始化 a ThreadPoolExecutor.CallerRunsPolicy,或者使用您自己的调用者阻塞的策略。

于 2012-04-27T15:21:43.960 回答
0

首先,我认为您的态度是错误的,因为您在伪代码中所做的工作正忙于等待,您应该阅读 java 教程http://docs.oracle.com/javase/tutorial/essential/concurrency/中的并发教程

忽略这一点,我会为您提供繁忙等待的解决方案(不推荐):

     ExecutorService e1 =  Executors.newFixedThreadPool(20);
     while (true) {
          if (!serverq.isEmpty() && !myq.isFull()) myq.enq(serverq.poll());
          if (!myq.isEmpty()) e1.execute(myq.poll());
     }

笔记:

1.如其他答案所述,确保您的 myq 已同步。您可以扩展一些阻塞队列以确保同步正确。

2.您实现了一个可运行类,该类在服务迭代中执行您从服务器执行的操作,这些可运行类必须将 myq 作为构造函数的参数并将其保存为全局变量。

3.myq 获取runnables,在它的run 方法结束时,你必须确保runnable从myq 中删除自己。

于 2012-04-27T16:08:46.350 回答
0

拥有一个不会执行超过 200 个任务并等待任务完成后再提交 201 个任务的阻塞池怎么样。我已经在我的应用程序中使用信号量实现了它。您还可以通过将值传递给其构造函数来更改限制。

与@Gray 答案的唯一区别是,在这种情况下,很少有任何任务会被拒绝。信号量将使任何 201 任务等待,除非其他任务结束。尽管如此,我们有拒绝处理程序在任何拒绝的情况下将该任务重新提交给执行者。

private class BlockingPool extends ThreadPoolExecutor {
    private final Semaphore semaphore;      
    public BlockingPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int tasksAllowedInThreads){    
        super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                executor.execute(r);
            }
        });
        semaphore = new Semaphore(tasksAllowedInThreads);
    }

    @Override
    public void execute(Runnable task){
        boolean acquired = false;
        do{
            try{
                semaphore.acquire();
                acquired = true;
            } catch (final InterruptedException e){
                // log
            }
        } while (!acquired); // run in loop to handle InterruptedException
        try{
            super.execute(task);
        } catch (final RejectedExecutionException e){
            System.out.println("Task Rejected");
            semaphore.release();
            throw e;
        }
    }    

    @Override
    protected void afterExecute(Runnable r, Throwable t){
        super.afterExecute(r, t);
        if (t != null){
            t.printStackTrace();
        }
        semaphore.release();
    }
}

这有意义吗!

于 2015-07-18T07:23:26.197 回答