0

我们正在使用 spring boot 2.0.0.BUILD_SNAPSHOT 和 spring boot webflux 5.0.0,目前我们无法根据请求将通量传输给客户端。

目前我正在从迭代器创建通量:

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }
    });
}

根据要求,我只是在做:

@RequestMapping(value="/all", method=RequestMethod.GET, produces="application/json")
public Flux<ItemIgnite> getAllFlux() {
    return this.provider.getAllFlux();
}

localhost:8080/all当我现在在 10 秒后本地调用时,我得到一个503状态码。当我请求/all使用时,也与客户端一样WebClient

public Flux<ItemIgnite> getAllPoducts(){
    WebClient webClient = WebClient.create("http://localhost:8080");

    Flux<ItemIgnite> f = webClient.get().uri("/all").accept(MediaType.ALL).exchange().flatMapMany(cr -> cr.bodyToFlux(ItemIgnite.class));
    f.subscribe(System.out::println);
    return f;

}

什么都没发生。没有数据被传输。

当我改为执行以下操作时:

public Flux<List<ItemIgnite>> getAllFluxMono() {
    return Flux.just(this.getAllList());
}

@RequestMapping(value="/allMono", method=RequestMethod.GET, produces="application/json")
public Flux<List<ItemIgnite>> getAllFluxMono() {
    return this.provider.getAllFluxMono();
}

这是工作。我猜是因为所有数据都已经完成加载并刚刚传输到客户端,因为它通常会在不使用通量的情况下传输数据。

我必须改变什么才能让通量将数据流式传输到请求这些数据的 Web 客户端?

编辑

我在ignite 缓存中有数据。所以我getAllIterator正在从 ignite 缓存中加载数据:

public Iterator<Cache.Entry<String, ItemIgnite>> getAllIterator() {
    return this.igniteCache.iterator();
}

编辑

flux.complete()像@Simon Baslé 建议的那样添加:

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }

        flux.complete(); // see here
    });
}

解决503了浏览器中的问题。但这并不能解决WebClient. 仍然没有传输数据。

编辑 3

使用:publishOn_Schedulers.parallel()

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.<ItemIgnite>create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }

        flux.complete();
    }).publishOn(Schedulers.parallel());
}

不改变结果。

在这里,我向您发布 WebClient 收到的内容:

value :[Item ID: null, Product Name: null, Product Group: null]
complete

所以看起来他得到了一个项目(超过 35.000 个),并且值是空的,他正在完成之后。

4

2 回答 2

4

跳出来的一件事是你永远不会调用flux.complete()你的create.

但实际上有一个工厂操作员专门用于将 a 转换Iterable为 a Flux,所以你可以这样做Flux.fromIterable(this)

编辑:如果您Iterator隐藏了数据库请求(或任何阻塞 I/O)之类的复杂性,请注意这会带来麻烦:反应链中的任何阻塞,如果没有publishOn使用只有整个链,但其他反应进程都很好(因为线程可以并且将被多个反应进程使用)。

既不create也不fromIterable做任何特别的事情来防止阻塞源。我认为你正面临着这样的问题,从你对WebClient.

于 2017-07-10T09:02:46.740 回答
0

问题是我ItemIgnite转移的对象。系统 Flux 似乎无法处理这个问题。因为如果我将原始代码更改为以下内容:

public Flux<String> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue().toString());
        }
    });
}

一切正常。没有publishOn和没有flux.complete()。也许有人知道为什么这是有效的。

于 2017-07-10T11:01:54.520 回答