2

如以下链接所述:-

如何让 ThreadPoolExecutor 在排队之前将线程增加到最大值?

我将队列实现更改为在输入元素后返回 false。因此,每当将新任务插入队列时,都会为其创建一个新线程。

但是,当我使用放置的记录器大规模运行以下实现(Bis 系统测试)时,会产生一个新问题。

当一个任务来执行时,它会被插入到队列中,当队列返回 false 时,会创建一个新线程来执行它。池中当前存在的空闲线程不会被拾取。getTask()原因是从队列中选择任务的方法将任务分配给空闲线程。所以我的问题是如何改变这种行为,以便如果线程空闲,如何确保为空闲线程分配任务以执行而不是创建新线程?

下面的输出会让它更清楚:-

Task 46 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 47 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Task 48 ends
Active Count: 0 Pool Size : 3 Idle Count: 3 Queue Size: 0
Active Count: 1 Pool Size : 4 Idle Count: 3 Queue Size: 0
Task 49 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0
Task 50 ends
Active Count: 2 Pool Size : 5 Idle Count: 3 Queue Size: 0

代码文件如下:-

ThreadPoolExecutor 是 java 1.5 版本,因为我们在服务器机器上使用 1.5 并且无法升级它。

线程池执行器:-

 public void execute(Runnable command) {
    System.out.println("Active Count: " + getActiveCount()
            + " Pool Size : " + getPoolSize() + " Idle Count: "
            + (getPoolSize() - getActiveCount())+" Queue Size: "+getQueue().size());
          if (command == null)
              throw new NullPointerException();
          for (;;) {
              if (runState != RUNNING) {
                  reject(command);
                  return;
              }
              if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
                  return;
              if (workQueue.offer(command))
                  return;
              int status = addIfUnderMaximumPoolSize(command);
              if (status > 0)      // created new thread
                  return;
              if (status == 0) {   // failed to create thread
                  reject(command);
                  return;
              }
              // Retry if created a new thread but it is busy with another task
          }
      }

LinkedBlockingQueue:-

public class CustomBlockingQueue<E> extends LinkedBlockingQueue<E> 
{

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    public CustomBlockingQueue() {
    super(Integer.MAX_VALUE);
    }

    public boolean offer(E e) {
       return false;
    }

}

在拒绝处理程序中,我们正在调用我们尚未覆盖的队列的 put 方法

调用执行器

   final CustomThreadPoolExecutor tpe = new CustomThreadPoolExecutor(3, 8, 0L, TimeUnit.MILLISECONDS, new MediationBlockingQueue<Runnable>(), new MediationRejectionHandler());

private static final int TASK_COUNT = 100;

 for (int i = 0; i < TASK_COUNT; i++) {

......
tpe.execute(new Task(i));
.....
}

我们调用核心池大小为 3 的执行器,最大池大小为 8,并为任务使​​用无界链接阻塞队列。

4

4 回答 4

1

我相信您的问题如下:

public boolean offer(E e) {
   return false;
}

这将始终返回false到 TPE,这将导致它启动另一个线程,而不管当前有多少线程处于空闲状态。这不是我在此答案中的代码示例所建议的。我不得不在反馈后纠正它的早期问题。

我的回答是让你的offer(...)方法看起来像:

public boolean offer(Runnable e) {
    /*
     * Offer it to the queue if there is 1 or 0 items already queued, else
     * return false so the TPE will add another thread.
     */
    if (size() <= 1) {
        return super.offer(e);
    } else {
        return false;
    }
}

因此,如果队列中已经有 2 个或更多的东西,它将派生另一个线程,否则它将把任务排入队列中,该任务应该由空闲线程拾取。你也可以玩1价值。尝试使用0或超过1可能适合您的应用程序。将该值注入您的CustomBlockingQueue可能是有序的。

于 2013-10-30T22:04:19.653 回答
1

使用SynchronousQueue. 当且仅当已经有等待接收者时,它才会接受提供的项目。因此空闲线程将获取项目,一旦没有空闲线程,ThreadPoolExecutor将启动新线程。

唯一的缺点是一旦所有线程都启动了,您不能简单地将待处理的项目放入队列中,因为它没有容量。因此,您要么必须接受提交者被阻止,要么需要另一个队列来将待处理的任务放入它,并需要另一个后台线程尝试将这些待处理的项目放入同步队列。这个额外的线程不会影响性能,因为它大部分时间都被这两个队列中的任何一个阻塞。

class QueuingRejectionHandler implements RejectedExecutionHandler {

  final ExecutorService processPending=Executors.newSingleThreadExecutor();

  public void rejectedExecution(
      final Runnable r, final ThreadPoolExecutor executor) {
    processPending.execute(new Runnable() {
      public void run() {
        executor.execute(r);
      }
    });
  }
}

…</p>

ThreadPoolExecutor e=new ThreadPoolExecutor(
  corePoolSize, maximumPoolSize, keepAliveTime, unit,
  new SynchronousQueue<Runnable>(), new QueuingRejectionHandler());
于 2013-10-30T12:09:56.340 回答
0

Gray在这里给出的解决方案很棒,但是我遇到了和你一样的问题,即理想的线程没有被用来挑选新的任务,但是如果 poolSize 小于 maxPoolSize,就会创建新的线程。

因此,我尝试通过复制完整的类(不是一个好主意,但找不到任何其他解决方案)并使用 ThreadPoolExecutor 扩展它并覆盖执行方法来调整 ThreadPoolExecutor 本身的功能。

下面是方法:

public void execute(Runnable command)
{
 System.out.println("ActiveCount : " + this.getActiveCount()
            + " PoolSize : " + this.getPoolSize() + " QueueSize : "
            + this.getQueue().size());

if (command == null)
    throw new NullPointerException();
for (;;)
{
    if (runState != RUNNING)
    {
    reject(command);
    return;
    }
    if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
    return;
    //Now, it will never offer to queue but will go further for thread creation.
    //if (workQueue.offer(command))
    //return;


    //This check is introduced to utilized ideal threads instead of creating new thread 
    //for incoming tasks.
    //Example : coreSize = 3, maxPoolSize = 8.
    //activeCount = 4, and PoolSize = 5, so 1 thread is ideal Currently queue is empty. 
    //When new task comes, it will offer that to queue, and getTask() will take care and execute the task.
    //But if new task comes, before ideal thread takes task from queue, 
    //activeCount = 4, and PoolSize = 5, so 1 thread is ideal Currently queue size = 1.
    //this check fails and new thread is created if poolsize under max size or 
    //task is added to queue through rejection handler.

    if ((this.getPoolSize() - this.getActiveCount()) > 0 && 
        (this.getPoolSize() - this.getActiveCount() - workQueue.size()) > 0)
    {
    workQueue.offer(command);
    return;
    }
    int status = addIfUnderMaximumPoolSize(command);
    if (status > 0) // created new thread
    return;
    if (status == 0)
    { // failed to create thread
    reject(command);
    return;
    }
    // Retry if created a new thread but it is busy with another task
}
}

在拒绝处理程序中,我使用 put 方法将任务放入队列(无界)中,正如 Gray 所建议的那样。:)

注意:我没有在我的代码中覆盖 Queue 的行为。

于 2013-10-30T12:12:30.073 回答
0

所以我的问题是如何改变这种行为,以便如果线程空闲,如何确保为空闲线程分配任务以执行而不是创建新线程?

在过去的几年里,情况有了很大的改善。Executors 使用 Java 8 newWorkStealingPool API可以轻松解决您的问题

新工作窃取池

public static ExecutorService newWorkStealingPool()

创建一个工作窃取线程池,使用所有可用处理器作为其目标并行度级别。

ExecutorService executorService = Executors.newWorkStealingPool();

会为你做所需的魔法。newWorkSteatingPool将返回一个ExecutorService类型ForkJoinPool。在ForkJoinPool,Idle threads will steal task from busy thread's queue中,您正在寻找。

于 2016-02-29T02:21:04.080 回答