0

设corePoolSize = 4,四次调用submit(或scheduleAtFixedRate等)方法后,查询填充,getActiveCount()方法返回正确值4,通过future.cancel(true)取消工作任务后getActiveCount( ) = 3,但是我认为新的提交(或 scheduleAtFixedRate 等)t call factory method Thread newThread(Runnable r) of ThreadFactory, before that was caused, and it是错误的,并且 getQueue() 在成功提交后为零,在没有明确创建的情况下也永远不会发生 RejectedExecutionException

public class ScheduledTaskCommandExecutor extends ScheduledThreadPoolExecutor {
  private static final TaskCommandThreadFactory factory;
  private static final ConcurrentSkipListMap<ScheduledFuture, String> activeTask;
  private final Semaphore semaphore;

  static {
    factory = new TaskCommandThreadFactory();
    activeTask = new ConcurrentSkipListMap<>();
  }

  public ScheduledTaskCommandExecutor(int corePoolSize) {
    super(4, factory, new RejectionHandler());
    //setMaximumPoolSize(corePoolSize);
    //allowCoreThreadTimeOut(true);
    semaphore = new Semaphore(corePoolSize);
    setRemoveOnCancelPolicy(true);
    setKeepAliveTime(10, TimeUnit.MILLISECONDS);
  }

  @Override
  protected void beforeExecute(Thread t, Runnable r) {
    super.beforeExecute(t, r);
    activeTask.putIfAbsent((ScheduledFuture) r, t.getName());
    System.out.println(t.getName() + " " + ConsoleProperties.Message.TASK_IS_READY.toString());
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    try {
      long endTime = System.nanoTime();
    } finally {
      super.afterExecute(r, t);
      System.out.println(getTaskNameByFuture((ScheduledFuture) r) + " " + ConsoleProperties.Message.TASK_IS_COMPLETED.toString());
      activeTask.remove(r);
      //why getQueue()always empty after execute??
      for(Iterator<Runnable> iterator = getQueue().iterator();iterator.hasNext();) {
        System.out.println(iterator.toString());
      }
      //semaphore.release(1);
      purge();
    }
  }

  @Override
  protected void terminated() {
    try {

    } finally {
      super.terminated();
      System.out.println(ConsoleProperties.Message.EXECUTOR_TERMINATED);
    }
  }

  public TaskCommand execute(TaskCommand command) throws RejectedExecutionException {
    //semaphore.tryAcquire() tried that too
    if(getActiveCount() == getCorePoolSize()) {
      System.out.println(getActiveCount() + " " + getCorePoolSize());
      throw new RejectedExecutionException();
    }
    factory.setCommand(command);
    return command.setFuture(scheduleWithFixedDelay(command, command.getDelay(), command.getWaitInterval(), TimeUnit.MILLISECONDS));
  }

  @Override
  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long delay, long period, TimeUnit timeUnit) {
    try {
      return super.scheduleAtFixedRate(runnable, delay, period, timeUnit);
    } catch (RuntimeException e) {
      System.out.println("threadNumber:" + e);
      //todo log
      throw e;
    }
  }

  public boolean cancelTaskByName(String name) {
    if (activeTask.containsValue(name)) {
      for (Map.Entry<ScheduledFuture, String> entry : activeTask.entrySet()) {
        if (name.equals(entry.getValue())) {
          return entry.getKey().cancel(true);
        }
      }
    }
    return false;
  }

  public String getState() {
    StringBuilder ret = new StringBuilder();
    ret.append("task count:").append(getTaskCount()).append("\nactive count:").append(getActiveCount()).append("\n");
    ret.append(Arrays.deepToString(activeTask.values().toArray()) + "\n");
    for (Map.Entry<ScheduledFuture, String> entry : activeTask.entrySet()) {
      ret.append("task " + entry.getValue() + " is done " + entry.getKey().isDone() + "\n");
    }
    //ret.append(Arrays.deepToString(executor.getQueue().toArray()) + "\n");
    return ret.toString();
  }

  private String getTaskNameByFuture(ScheduledFuture task) {
    for (Map.Entry<ScheduledFuture, String> entry : activeTask.entrySet()) {
      if (entry.getKey() == task) {
        return entry.getValue();
      }
    }
    return ConsoleProperties.Error.TASK_NOT_FOUND.toString();
  }

  private static class TaskCommandThreadFactory implements ThreadFactory {
    static final AtomicInteger poolNumber = new AtomicInteger(1);
    final ThreadGroup group;
    AtomicInteger threadNumber = new AtomicInteger(1);
    final String namePrefix;
    private TaskCommand command;

    public void setCommand(TaskCommand command) {
      this.command = command;
    }

    TaskCommandThreadFactory() {
      SecurityManager s = System.getSecurityManager();
      group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
      namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    }

    //this factory`s method don`t call after release full query
    @Override
    public Thread newThread(Runnable r) {
      int n = threadNumber.getAndIncrement();
      //System.out.println("threadNumber:" + n);
      command.setSerialNumber(n);
      Thread ret = new Thread(group, r, /*namePrefix +*/ command.getName(), 0);
      if (ret.isDaemon()) ret.setDaemon(false);
      if (ret.getPriority() != Thread.NORM_PRIORITY) ret.setPriority(Thread.NORM_PRIORITY);
      ret.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
          //todo log4j
          System.out.println(t.getName() + " : Error: " + e.getMessage());
        }
      });
      System.out.println("command thread number:" + command.getName());
      return ret;
    }
  }

  //and this will never occur this Exception without explicitly creating in  
  //public TaskCommand execute(TaskCommand command) method
  private static class RejectionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      //todo log4j
      System.out.println(r.toString() + " : Rejected");
      throw new RejectedExecutionException();
    }
  }
}

稍后在代码中的某处

ScheduledTaskCommandExecutor exec = new ScheduledTaskCommandExecutor(2);
Future futureOne = exec.execute(Runnable); // called method public Thread newThread(Runnable r) in ThreadFactory
why getQueue().size() is zero ?
getActiveCount() is 1;

Future futureTwo = exec.execute(Runnable); // called public Thread newThread(Runnable r) in ThreadFactory
why getQueue().size() is zero ?
getActiveCount() is 2;

Future futureThree = exec.execute(Runnable); // error because checking in  getActiveCount() == getCorePoolSize(),

up to this point everything is correct

futureOne.cancel(true);
getQueue().size() is still zero;
getActiveCount() is 1;

//again
Future futureThree = exec.execute(Runnable); // why now method newThread(Runnable r) in ThreadFactory has not called ?
4

2 回答 2

0

如果你只是想挂钩一个新任务的执行,尝试重写 beforeExecute 方法。

于 2014-03-20T20:38:19.603 回答
0

如果我理解这个问题(这有点令人困惑),这是理想的行为 - 你有一个池线程,所以你不需要创建一个新线程,并且执行程序只是查看空闲线程并重用它。

于 2014-03-20T18:58:36.720 回答