1

我想使用并行处理但在特定超时内检索多个“代价高昂”的结果。

我正在使用GPars Dataflow.task 但看起来我遗漏了一些东西,因为只有当所有数据流变量都被绑定时,进程才会返回。

def timeout = 500
def mapResults = []
GParsPool.withPool(3) { 
    def taskWeb1 = Dataflow.task {
        mapResults.web1 = new URL('http://web1.com').getText()
    }.join(timeout, TimeUnit.MILLISECONDS)
    def taskWeb2 = Dataflow.task {
        mapResults.web2 = new URL('http://web2.com').getText()
    }.join(timeout, TimeUnit.MILLISECONDS)
    def taskWeb3 = Dataflow.task {
        mapResults.web3 = new URL('http://web3.com').getText()
    }.join(timeout, TimeUnit.MILLISECONDS)
}

我确实在GPars Timeouts 文档中看到了一种使用 Select 在超时内获得最快结果的方法。但我正在寻找一种在给定时间范围内检索尽可能多的结果的方法。

有没有更好的“GPars”方法来实现这一目标?还是使用 Java 8 Future/Callable ?

4

1 回答 1

1

由于您也对基于 Java 8 的解决方案感兴趣,因此可以使用以下方法:

int timeout = 250;
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
    Map<String, CompletableFuture<String>> map = 
        Stream.of("http://google.com", "http://yahoo.com", "http://bing.com")
            .collect(
                Collectors.toMap(
                    // the key will be the URL
                    Function.identity(),
                    // the value will be the CompletableFuture text fetched from the url
                    (url) -> CompletableFuture.supplyAsync(
                        () -> readUrl(url, timeout), 
                        executorService
                    )
                )
            );
    executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);

    //print the resulting map, cutting the text at 100 chars
    map.entrySet().stream().forEach(entry -> {
        CompletableFuture<String> future = entry.getValue();
        boolean completed = future.isDone() 
                && !future.isCompletedExceptionally() 
                && !future.isCancelled(); 
        System.out.printf("url %s completed: %s, error: %s, result: %.100s\n",
            entry.getKey(),
            completed, 
            future.isCompletedExceptionally(),
            completed ? future.getNow(null) : null);
    });
} catch (InterruptedException e) {
    //rethrow
} finally {
    executorService.shutdownNow();
}

这将为您提供与Future您拥有的 URL 一样多的 s,但让您有机会查看是否有任何任务因异常而失败。如果您对这些异常不感兴趣,可以简化代码,只对成功检索的内容感兴趣:

int timeout = 250;
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
    Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
    Stream.of("http://google.com", "http://yahoo.com", "http://bing.com")
        .forEach(url -> {
            CompletableFuture
                .supplyAsync(
                    () -> readUrl(url, timeout), 
                    executorService
                ).thenAccept(content -> map.put(url, content));
        });
    executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);

    //print the resulting map, cutting the text at 100 chars
    map.entrySet().stream().forEach(entry -> {
        System.out.printf("url %s completed, result: %.100s\n",
            entry.getKey(), entry.getValue() );
    });
} catch (InterruptedException e) {
    //rethrow
} finally {
    executorService.shutdownNow();
}

在打印结果之前,这两个代码都将等待大约 250 毫秒(由于将任务提交给 executor 服务,它只会多花一点时间)。我发现大约 250 毫秒是可以在我的网络上获取其中一些 url-s 的阈值,但不一定是全部。随意调整超时进行实验。

对于该readUrl(url, timeout)方法,您可以使用像Apache Commons IO这样的实用程序库。即使您没有明确考虑该timeout参数,提交给执行器服务的任务也会收到中断信号。我可以为此提供一个实现,但我认为它超出了您问题中主要问题的范围。

于 2016-11-07T02:59:22.063 回答