7

我有棘手的情况,是否future.isDone()返回false,即使线程完成。

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DataAccessor {
    private static ThreadPoolExecutor executor;
    private int timeout = 100000;
    static {
        executor = new ThreadPoolExecutor(10, 10, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
    }

    public static void main(String[] args) {
        List<String> requests = new ArrayList<String>();
        for(int i=0; i<20; i++){
            requests.add("request:"+i);
        }
        DataAccessor dataAccessor = new DataAccessor();

        List<ProcessedResponse> results = dataAccessor.getDataFromService(requests);
        for(ProcessedResponse response:results){
            System.out.println("response"+response.toString()+"\n");
        }
        executor.shutdown();
    }

    public List<ProcessedResponse> getDataFromService(List<String> requests) {
        final CountDownLatch latch = new CountDownLatch(requests.size());
        List<SubmittedJob> submittedJobs = new ArrayList<SubmittedJob>(requests.size());
        for (String request : requests) {
            Future<ProcessedResponse> future = executor.submit(new GetAndProcessResponse(request, latch));
            submittedJobs.add(new SubmittedJob(future, request));
        }
        try {
            if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
                // some of the jobs not done
                System.out.println("some jobs not done");
            }
        } catch (InterruptedException e1) {
            // take care, or cleanup
            for (SubmittedJob job : submittedJobs) {
                job.getFuture().cancel(true);
            }
        }
        List<ProcessedResponse> results = new LinkedList<DataAccessor.ProcessedResponse>();
        for (SubmittedJob job : submittedJobs) {
            try {
                // before doing a get you may check if it is done
                if (!job.getFuture().isDone()) {
                    // cancel job and continue with others
                    job.getFuture().cancel(true);
                    continue;
                }
                ProcessedResponse response = job.getFuture().get();
                results.add(response);
            } catch (ExecutionException cause) {
                // exceptions occurred during execution, in any
            } catch (InterruptedException e) {
                // take care
            }
        }
        return results;
    }

    private class SubmittedJob {
        final String request;
        final Future<ProcessedResponse> future;

        public Future<ProcessedResponse> getFuture() {
            return future;
        }

        public String getRequest() {
            return request;
        }

        SubmittedJob(final Future<ProcessedResponse> job, final String request) {
            this.future = job;
            this.request = request;
        }
    }

    private class ProcessedResponse {
        private final String request;
        private final String response;

        ProcessedResponse(final String request, final String response) {
            this.request = request;
            this.response = response;
        }

        public String getRequest() {
            return request;
        }

        public String getResponse() {
            return response;
        }

        public String toString(){
            return "[request:"+request+","+"response:"+ response+"]";
        }
    }

    private class GetAndProcessResponse implements Callable<ProcessedResponse> {
        private final String request;
        private final CountDownLatch countDownLatch;

        GetAndProcessResponse(final String request, final CountDownLatch countDownLatch) {
            this.request = request;
            this.countDownLatch = countDownLatch;
        }

        public ProcessedResponse call() {
            try {
                return getAndProcessResponse(this.request);
            } finally {
                countDownLatch.countDown();
            }
        }

        private ProcessedResponse getAndProcessResponse(final String request) {
            // do the service call
            // ........
            if("request:16".equals(request)){
                throw (new RuntimeException("runtime"));
            }
            return (new ProcessedResponse(request, "response.of." + request));
        }
    }
}

如果我称它为future.isDone()返回值false,则coundownLatch.await()返回 true。任何想法?另请注意,发生这种情况时 countDownLatch.await 会立即出现。

如果您在此处发现格式不可读的视图,请访问http://tinyurl.com/7j6cvep

4

4 回答 4

9

这个问题很可能是时间问题之一。在关于 Future 的所有任务实际完成之前,闩锁将被释放(因为countDown()调用在call()方法内)。

您基本上是在重新创建CompletionService的工作(实现是ExecutorCompletionService),我建议改用它。您可以使用该poll(timeout)方法获得结果。只需跟踪总时间,并确保将每次通话的超时时间减少到总剩余时间。

于 2012-03-09T20:12:35.027 回答
3

正如 jtahlborn 所提到的,这可能是一种竞争条件,其中 CountdownLatch 向其等待线程发出信号,等待线程在 FutureTask 完成执行之前评估 Future 的取消条件(这将在 之后的某个时间发生countDown)。

您根本不能依赖 CountdownLatch 的同步机制与 Future 的同步机制同步。您应该做的是依靠 Future 告诉您何时完成。

你可以Future.get(long timeout, TimeUnit.MILLISECONDS)代替CountdownLatch.await(long timeout, TimeUnit.MILLISECONDS). 要获得与闩锁相同类型的效果,您可以将所有Futures 添加到 a List,遍历列表并获取每个 Future。

于 2012-03-10T11:01:36.237 回答
2

这是竞争条件的场景:

  • 主线程在latch.await,它在几毫秒内没有收到来自 Java 调度程序的 CPU 插槽
  • 子句countDownLatch.countDown()中最后一个执行线程调用finally
  • Java 调度程序决定给主线程更多的优先级,因为它等待了一段时间
  • 结果,当它请求最后一个Future结果时,它还不可用,因为最后一个执行线程没有时间片来传播结果,它仍然在finally...

我还没有找到关于 Java 调度程序如何真正工作的详细解释,可能是因为它主要取决于运行 JVM 的操作系统,但一般来说,它试图平均在一段时间内将 CPU 分配给可运行线程。这就是为什么主线程可以isDone在另一个离开finally子句之前到达测试。

我建议您在 之后更改结果的收集latch.await。如您所知,闩锁已降至零(除非主线程被中断),所有结果应该很快就会可用。带有超时的 get 方法让调度程序有机会将时间片分配给仍在 finally 子句中等待的最后一个线程:

    for (SubmittedJob job : submittedJobs) {
        try {
            ProcessedResponse response = null;
            try {
                // Try to get answer in short timeout, should be available
                response = job.getFuture().get(10, TimeUnit.MILLISECONDS);
            } catch (TimeoutException te) {
                job.getFuture().cancel(true);
                continue;
            }
            results.add(response);
        } catch (ExecutionException cause) {
            // exceptions occurred during execution, in any
        } catch (InterruptedException e) {
            // take care
        }
    }

备注:您的代码不现实,因为该getAndProcessResponse方法在不到一毫秒的时间内结束。在那里随机睡眠,比赛条件不会经常出现。

于 2012-03-13T23:41:34.977 回答
0

我支持关于比赛条件的意见。我建议忘记闩锁并使用 java.util.concurrent.ThreadPoolExecutor.awaitTermination(long, TimeUnit)

于 2012-03-15T17:38:05.567 回答