5

我在玩 Java8 的流和CompletableFutures。我预先存在的代码有一个类,它接受一个 URL 并下载它:

public class FileDownloader implements Runnable {
    private URL target;
    public FileDownloader(String target) {
        this.target = new URL(target);
    }
    public void run() { /* do it */ }
}

现在,这个类从另一个发出的部分List<String>(单个主机上的多个目标)获取它的信息。

我已将周围的代码切换为CompletableFuture

public class Downloader {
    public static void main(String[] args) {
        List<String> hosts = fetchTargetHosts();
        for (String host : hosts) {
            HostDownloader worker = new HostDownloader(host);
            CompletableFuture<List<String>> future = 
                CompletableFuture.supplyAsync(worker);
            future.thenAcceptAsync((files) -> {
                for (String target : files) {
                    new FileDownloader(target).run();
                }
            });
        }
    }

    public static class HostDownloader implements Supplier<List<String>> {
        /* not shown */ 
    }
    /* My implementation should either be Runnable or Consumer.
       Please suggest based on a idiomatic approach to the main loop.
     */
    public static class FileDownloader implements Runnable, Consumer<String> { 
        private String target;
        public FileDownloader(String target) {
            this.target = target;
        }

        @Override
        public void run() { accept(this.target); }

        @Override
        public void accept(String target) {
            try (Writer output = new FileWriter("/tmp/blubb")) {
                output.write(new URL(target).getContent().toString());
            } catch (IOException e) { /* just for demo */ }
        }
    }
}

现在,这感觉不自然。我正在产生一个Strings 流,而我FileDownloader一次消耗其中一个。是否有现成的东西可以使我的单个值ConsumerLists 一起使用,还是我在for这里陷入循环?

我知道将循环移入accept并制作 a是微不足道的Consumer<List<String>>,这不是重点。

4

3 回答 3

3

将两个直接相关的步骤分解为两个异步步骤是没有意义的。他们仍然相互依赖,如果分离有任何影响,那将不会是积极的。

你可以简单地使用

List<String> hosts = fetchTargetHosts();
FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts)
    CompletableFuture.runAsync(()->
        new HostDownloader(host).get().forEach(fileDownloader));

或者,假设FileDownloader没有关于下载的可变状态:

for(String host: hosts)
    CompletableFuture.runAsync(()->
        new HostDownloader(host).get().parallelStream().forEach(fileDownloader));

supplyAsync这仍然具有与使用plus的原始方法相同的并发级别thenAcceptAsync,只是因为这两个相关步骤无论如何都不能同时运行,所以简单的解决方案是将两个步骤放入一个简洁的操作中,该操作将异步执行。


但是,此时值得注意的CompletableFuture是,不建议将整个使用用于此操作。正如它的文档所述:

公共池的问题在于其预先配置的并发级别取决于 CPU 内核的数量,并且如果线程在 I/O 操作期间被阻塞,则不会调整。换句话说,它不适合 I/O 操作。

不像Stream,CompletableFuture允许你Executor异步操作指定 an ,这样你就可以配置你自己Executor的适合 I/O 操作,另一方面,当你处理 an 时Executor,根本不需要CompletableFuture,至少不需要对于这样一个简单的任务:

List<String> hosts = fetchTargetHosts();

int concurrentHosts = 10;
int concurrentConnections = 100;
ExecutorService hostEs=Executors.newWorkStealingPool(concurrentHosts);
ExecutorService connEs=Executors.newWorkStealingPool(concurrentConnections);

FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts) hostEs.execute(()-> {
    for(String target: new HostDownloader(host).get())
        connEs.execute(()->fileDownloader.accept(target));
});

在这个地方,您可以考虑将 的代码内联FileDownloader.accept到 lambda 表达式中,或者将其恢复为 aRunnable以便您可以将内部循环的语句更改为connEs.execute(new FileDownloader(target)).

于 2015-05-20T12:23:15.863 回答
1

我认为你需要这样做 forEach :

for (String host : hosts) {
    HostDownloader worker = new HostDownloader(host);
    CompletableFuture<List<String>> future = 
            CompletableFuture.supplyAsync(worker);
    future.thenAcceptAsync(files -> 
            files.stream()
            .forEach(target -> new FileDownloader(target).run())
    );
}

顺便说一句,你可以对主循环做同样的事情......

编辑:由于 OP 编辑​​了原始帖子,添加了 FileDownloader 的实现细节,我正在相应地编辑我的答案。Java 8 函数式接口旨在允许使用 lambda expr 代替具体的 Class。它并不意味着像常规接口 9 那样实现,尽管它可以实现)因此,“利用”Java 8 消费者意味着用接受的代码替换 FileDownloader,如下所示:

for (String host : hosts) {
    HostDownloader worker = new HostDownloader(host);
    CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(worker);
    future.thenAcceptAsync(files -> 
            files.forEach(target -> {
                try (Writer output = new FileWriter("/tmp/blubb")) {
                    output.write(new URL(target).getContent().toString());
                } catch (IOException e) { /* just for demo */ }
            })
    );
}
于 2015-05-20T09:31:21.413 回答
1

另一种选择是:

CompletableFuture.supplyAsync(worker)
                 .thenApply(list -> list.stream().map(FileDownloader::new))
                 .thenAccept(s -> s.forEach(FileDownloader::run));
于 2015-05-20T09:50:35.420 回答