3

我有以下课程:

WorkerTask.java

   public interface WorkerTask extends Task {

   // Constants
   public static final short WORKERTASK_SPIDER = 1;
   public static final short WORKERTASK_PARSER = 2;
   public static final short WORKERTASK_PRODUCT = 3;

   public int getType();
}

WorkerPool.java

class workerPool {

     private ThreadPoolExecutor executorPool_;

     //----------------------------------------------------  

     public WorkerPool(int poolSize) 
     {
        executorPool_ = new ThreadPoolExecutor(
           poolSize,5,10,TimeUnit.SECONDS,
           new ArrayBlockingQueue<Runnable>(10000000,false),
           Executors.defaultThreadFactory()
     );

     //----------------------------------------------------        

     public void assign(WorkerTask workerTask) {
         executorPool_.execute(new WorkerThread(workerTask));
     }

     //----------------------------------------------------  

     public void removeTasks(int siteID) {
        executorPool_.getQueue().removeIf(...);     
     }
}

我想调用方法 removeTasks 来删除一定数量的待处理任务,但我不知道如何使用方法 removeIf。它说:删除此集合中满足给定谓词的所有元素,但我不知道如何创建参数谓词。任何想法?

4

2 回答 2

2

如果你有一个Queue<WorkerTask>,你可以做这样的事情:

queue.removeIf(task -> task.getSiteID() == siteID)

有几个问题。一个问题是你得到的队列getQueue()BlockingQueue<Runnable>而不是Queue<WorkerTask>。如果您将Runnable实例提交到池中,则队列可能包含对您的实际任务的引用;如果是这样,您可以将它们降级为WorkerTask. 但是,这不能保证。此外,ThreadPoolExecutor的类文档说(在“队列维护”下):

方法getQueue()允许访问工作队列以进行监视和调试。强烈建议不要将此方法用于任何其他目的。提供了两种方法,当大量排队的任务被取消时remove(Runnable),它们可用于协助存储回收。purge()

看着remove(Runnable)方法,它的文档说

在放入内部队列之前,它可能无法删除已转换为其他形式的任务。

这表明您应该挂起Runnable已提交的实例以便remove()稍后调用它们。或者,调用submit(Runnable)获取Future并保存这些实例以取消它们。

但还有第二个问题可能使这种方法不够充分。假设您找到了一种从队列中删除或取消匹配任务的方法。另一个线程可能已决定提交匹配的新任务,但尚未提交。这里有一个竞争条件。您也许可以取消已入队的任务,但在您这样做之后,您不能保证没有提交新的匹配任务。

这是另一种方法。据推测,当您取消(或其他)站点 ID 时,在某处有一些逻辑可以停止提交与该侧 ID 匹配的新任务。问题是如何处理“进行中”的匹配任务,即在队列中或即将入队的任务。

与其尝试取消匹配的任务,不如更改任务,以便如果其站点 ID 已被取消,该任务将变为无操作。您可以将站点 ID 的取消记录在ConcurrentHashMap. 任何任务都会在开始工作之前检查此地图,如果存在站点 ID,它会简单地返回。将站点 ID 添加到地图将立即产生确保不会开始该站点 ID 上的新任务的效果。(已经开始的任务将一直运行到完成。)任何进行中的任务最终都会从队列中耗尽,而不会导致任何实际工作发生。

于 2016-01-12T06:30:23.707 回答
0

谓词是接收输入并返回布尔值的函数。

如果您使用的是 java 8,则可以使用 lambda 表达式: (elem) -> return elem.id == siteID

于 2016-01-11T11:29:00.793 回答