3

Callable考虑在实例内部进行长时间运行的计算。

并且考虑到这个计算的结果可能有一些取决于计算时间的精度,即:如果任务将被取消,那么它应该在取消之前返回到目前为止计算的内容(例如,我们有一个计算无理数的传送器)。

最好使用标准的 java 并发工具来实现这个范例,例如

Callable<ValuableResult> task = new Callable<>() { ... };
Future<ValuableResult> future = Executors.newSingleThreadExecutor().submit(task);
try {
    return future.get(timeout, TimeUnit.SECONDS);
} catch (TimeoutException te) {
    future.cancel(true);
    // HERE! Get what was computed so far
}

看来,如果不完全重新实现FutureThreadPoolExecutor接口,这个问题就无法解决。Java 1.7 中是否有任何方便的现有工具?

4

3 回答 3

1

好吧,在我看来,在这种情况下,最简单的方法是准备一些final ResultWrapper对象,这些对象将在此Callable实例中传递:

final ValuableResultWrapper wrapper = new ValuableResultWrapper();
final CountDownLatch latch = new CountDownLatch(1);

Callable<ValuableResultWrapper> task = new Callable<>() { 
   ... 
   wrapper.setValue(...); // here we set what we have computed so far
   latch.countDown();
   return wrapper;
   ...  
};
Future<ValuableResultWrapper> future = Executors.newSingleThreadExecutor().submit(task);
try {
    return future.get(timeout, TimeUnit.SECONDS);
} catch (TimeoutException te) {
    future.cancel(true);
    // HERE! Get what was computed so far
    latch.await();
    return wrapper;
}

UPD:在这样的实现中(变得复杂),我们必须引入某种锁存器(CountDownLatch在我的示例中)以确保该任务将在我们完成之前完成return wrapper;

于 2014-07-29T11:55:06.927 回答
1

与其通过 Future 的 API 取消它,不如通过您自己的机制告诉它完成(例如long,您传递给构造函数,它告诉它在正常返回之前运行多长时间;或者AtomicBoolean您设置为 true)。

请记住,一旦任务真正开始,cancel (true)就不会神奇地停止它。然后它所做的就是中断线程。有一些方法可以检查此标志并抛出 InterruptedException,否则您将不得不手动检查 isInterrupted 标志。因此,既然您无论如何都需要编写该协作机制,为什么不让它更适合您的需求呢?

于 2014-07-29T11:50:09.430 回答
0

CompletionSerivce比 only 更强大FutureTask,在许多情况下它更合适。我从中得到一些想法来解决问题。此外,它的子类publicExecutorCompletionService比FutureTask简单,只包含几行代码。这很容易阅读。所以我修改了类以获得部分计算的结果。对我来说是一个令人满意的解决方案,毕竟它看起来简单明了。

演示代码:

CompletionService<List<DeviceInfo>> completionService =
                new MyCompletionService<>(Executors.newCachedThreadPool());   
        Future task = completionService.submit(detector);
    try {
        LogHelper.i(TAG, "result 111: " );
        Future<List<DeviceInfo>> result = completionService.take();
        LogHelper.i(TAG, "result: " + result.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

这是类代码:

import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;

/**
*  This is a CompletionService like java.util.ExecutorCompletionService, but we can get partly computed result
 *  from our FutureTask which returned from submit, even we cancel or interrupt it.
 *  Besides, CompletionService can ensure that the FutureTask is done when we get from take or poll method.
 */
public class MyCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * FutureTask extension to enqueue upon completion.
     */
    private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        protected void done() { completionQueue.add(task); }
    }

    private static class DoneFutureTask<V> extends FutureTask<V> {
        private Object outcome;

        DoneFutureTask(Callable<V> task) {
            super(task);
        }

        DoneFutureTask(Runnable task, V result) {
            super(task, result);
        }

        @Override
        protected void set(V v) {
            super.set(v);
            outcome = v;
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            try {
                return super.get();
            } catch (CancellationException e) {
                return (V)outcome;
            }
        }

    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
            return new DoneFutureTask<V>(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
            return new DoneFutureTask<V>(task, result);
    }

    /**
     * Creates an MyCompletionService using the supplied
     * executor for base task execution and a
     * {@link LinkedBlockingQueue} as a completion queue.
     *
     * @param executor the executor to use
     * @throws NullPointerException if executor is {@code null}
     */
    public MyCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
     * Creates an MyCompletionService using the supplied
     * executor for base task execution and the supplied queue as its
     * completion queue.
     *
     * @param executor the executor to use
     * @param completionQueue the queue to use as the completion queue
     *        normally one dedicated for use by this service. This
     *        queue is treated as unbounded -- failed attempted
     *        {@code Queue.add} operations for completed tasks cause
     *        them not to be retrievable.
     * @throws NullPointerException if executor or completionQueue are {@code null}
     */
    public MyCompletionService(Executor executor,
                               BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}
于 2018-10-12T18:19:03.697 回答