0

反应堆菜鸟在这里。

这更像是一个 HowTo 问题。

假设我有一个要抓取的网站,其中包含一组分页的搜索结果。搜索结果页数未知。每个搜索页面都有一个指向下一页的链接。我想从所有页面中抓取所有搜索结果并处理每个搜索结果。

我如何使用 Reactor (Mono/Flux) 在 Java 中实现这一点?

我想尽可能“积极”地做到这一点。

基本上,以下命令式伪代码的 Reactor (3.x) 版本:

    String url = "http://example.com/search/1";
    Optional<Document> docOp = getNextPage(url);    (1)
    while (docOp.isPresent()) {
        Document doc = docOp.get();
        processDoc(doc);                            (2)
        docOp = getNextPage(getNextUrl(doc));       (3)
    }

    // (1) Get the first page of search results
    // (2) Process all the search results on this page asynchronously
    // (3) Find the next page URL, and get that page
4

1 回答 1

0

https://gitter.im/reactor/reactor的帮助下,我找到了这个解决方案。它可能并不理想。我很想得到任何人可能会看到的任何问题的反馈。

public void scrape() {

    Try<Document> firstDocTry = this.getSearchResultsPage(Option.<Document>none().toTry());    (1)

    // Generate a flux where each element in the flux is created using the current element
    Flux.<Try<Document>, Try<Document>>generate(() -> firstDocTry, (docTry, sink) -> {         (2)
            docTry = this.getSearchResultsPage(docTry);
            docTry.isFailure() ? sink.complete() : sink.next(docTry);
            return docTry;
        })
        .flatMap(docTry -> this.transformToScrapedLoads(docTry))                               (3)
        .log()
        .subscribe(scrapedLoad ->
            scrapedLoadRepo.save(scrapedLoad)                                                  (4)
        );
}

protected Try<Document> getSearchResultsPage(Try<Document> docTry) {
    ...
}

protected Flux<ScrapedLoad> transformToScrapedLoads(Try<Document> docTry) {
    ...
}

(1) 在这里使用Javaslang 的一元Try 和Option。'firstDocTry' 为生成器播种。如果没有提供 Document,getSearchResultsPage() 知道从搜索的第一页开始。

(2) 在这里使用生成器。Flux 中发布的每个元素都由之前的元素决定

(3)transform方法将每个doc转换为一个Flux,合并发送订阅为单个Flux

(4) 订阅者对 Flux 产生的每个元素进行操作。在这种情况下,持久化它们。

于 2017-02-03T20:55:47.127 回答