3

考虑一个接受需要很长时间初始化的服务的配置设置的用户界面(例如,JDBC 连接的参数)。我们希望我们的用户界面在服务初始化发生时保持响应。如果用户进行其他更改,则应取消初始化并使用新参数重新启动。

因为在用户键入每个字符时参数都包含在配置中,因此可能会连续创建多个初始化请求。只有最后一个应该被执行。

我们已经编写了实现此结果的代码,但似乎这种行为非常适合作为 ExecutorService 实现。在我们将所有内容重构为 ExecutorService 之前,我想我会问世界上是否已经存在类似的实现。

更具体:

ExecutorService 将有一个工作线程。一旦提交了新任务,当前任务就会被取消(并且工作人员会中断)。然后捕获新任务以供下一次执行。如果提交了另一个任务,则再次取消当前任务,并将“下一次执行”任务设置为这个新任务。当工作线程最终选择下一个任务执行时,它始终是最后一个提交的任务——所有其他任务要么被取消,要么被丢弃。

有没有人愿意分享这样的实现?或者是否有一个涵盖这种行为的标准库?实现起来并不难,但是确定线程安全可能会很棘手,所以如果可以的话,我宁愿使用经过验证的代码。

4

2 回答 2

3

这是我最终想出的——我对任何评论都感兴趣:

public class InterruptingExecutorService extends ThreadPoolExecutor{
    private volatile FutureTask<?> currentFuture;

    public InterruptingExecutorService(boolean daemon) {
        super(0, 1, 1000L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(), 
                daemon ? new DaemonThreadFactory() : Executors.defaultThreadFactory());

    }

    public static class DaemonThreadFactory implements ThreadFactory{
        ThreadFactory delegate = Executors.defaultThreadFactory();

        @Override
        public Thread newThread(Runnable r) {
            Thread t = delegate.newThread(r);
            t.setDaemon(true);
            return t;
        }

    }

    private void cancelCurrentFuture(){
        // cancel all pending tasks
        Iterator<Runnable> it = getQueue().iterator();
        while(it.hasNext()){
            FutureTask<?> task = (FutureTask<?>)it.next();
            task.cancel(true);
            it.remove();
        }

        // cancel the current task
        FutureTask<?> currentFuture = this.currentFuture;
        if(currentFuture != null){
            currentFuture.cancel(true);
        }
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) throw new NullPointerException();

        cancelCurrentFuture();
        if (!(command instanceof FutureTask)){ // we have to be able to cancel a task, so we have to wrap any non Future
            command = newTaskFor(command, null);
        }
        super.execute(command);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        // it is safe to access currentFuture like this b/c we have limited the # of worker threads to only 1
        // it isn't possible for currentFuture to be set by any other thread than the one calling this method
        this.currentFuture = (FutureTask<?>)r;
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // it is safe to access currentFuture like this b/c we have limited the # of worker threads to only 1
        // it isn't possible for currentFuture to be set by any other thread than the one calling this method
        this.currentFuture = null;
    }
}
于 2012-10-21T15:41:58.683 回答
1

您可能需要向执行者添加 DiscardOldestPolicy

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.DiscardOldestPolicy.html

You will get
0 submitted
1 submitted
2 submitted
3 submitted
4 submitted
5 submitted
6 submitted
7 submitted
8 submitted
9 submitted
9 finished

public static void main(String[] args) throws SecurityException,
        NoSuchMethodException {

    final Method interruptWorkers = ThreadPoolExecutor.class
            .getDeclaredMethod("interruptWorkers");
    interruptWorkers.setAccessible(true);

    ExecutorService executor = new ThreadPoolExecutor(1, 1, 0L,
            TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
            new RejectedExecutionHandler() {

                @Override
                public void rejectedExecution(Runnable r,
                        ThreadPoolExecutor executor) {
                    if (!executor.isShutdown()) {
                        try {

                            interruptWorkers.invoke(executor);
                            executor.execute(r);

                        } catch (IllegalArgumentException e) {
                            e.printStackTrace();
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        } catch (InvocationTargetException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });


    for(int i =0 ;i<10;i++)
        executor.submit(newTask(i));
}

private static Runnable newTask(final int id) {
    return new Runnable() {
        {

            System.out.println(id + " submitted");
        }

        @Override
        public void run() {
            try {

                Thread.sleep(5000l);
                System.out.println(id + " finished");
            } catch (InterruptedException e) {
            }

        }

    };
}
于 2012-10-19T05:29:04.970 回答