0

我正在做一个反应/通量/单休息服务。后端是 Oracle,使用 rxjava2-jdbc。

如何克服这个阻塞错误?

我正在通过示例学习 rx,因此很高兴了解防止感觉常规使用的列表操作的概念细节。

  1. Repository 从 rx/database 返回一个 Flux:

回购返回通量<>

  1. 处理程序尝试将该列表/通量添加到另一个 Protobuf 对象 SearchResponse,但失败。

AppHander 尝试将客户列表添加到 SearchResponse protobuf 对象

短栈恍惚:

转换 List<Proto.SearchResponse> 导致错误

完整的堆栈跟踪:

2020-02-23T10:43:37,967 INFO  [main] o.s.d.r.c.RepositoryConfigurationDelegate: Bootstrapping Spring Data JDBC repositories in DEFAULT mode.
2020-02-23T10:43:38,046 INFO  [main] o.s.d.r.c.RepositoryConfigurationDelegate: Finished Spring Data repository scanning in 69ms. Found 0 JDBC repository interfaces.
2020-02-23T10:43:38,988 INFO  [main] c.z.h.HikariDataSource: HikariPool-1 - Starting...
2020-02-23T10:43:39,334 INFO  [main] c.z.h.HikariDataSource: HikariPool-1 - Start completed.
2020-02-23T10:43:40,196 INFO  [main] o.s.b.w.e.n.NettyWebServer: Netty started on port(s): 8080
2020-02-23T10:43:40,199 INFO  [main] o.s.b.StartupInfoLogger: Started App in 4.298 seconds (JVM running for 5.988)
2020-02-23T10:44:01,307 ERROR [reactor-http-nio-2] o.s.c.l.CompositeLog: [d806b05e]  500 Server Error for HTTP GET "/webflux/customers"
java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-2
    at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:160)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ HTTP GET "/webflux/customers" [ExceptionHandlingWebHandler]
Stack trace:
        at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:160)
        at com.google.protobuf.AbstractMessageLite$Builder.addAllCheckingNulls(AbstractMessageLite.java:372)
        at com.google.protobuf.AbstractMessageLite$Builder.addAll(AbstractMessageLite.java:434)
        at pn.api.protobuf.Proto$SearchResponse$Builder.addAllCustomers(Proto.java:3758)
        at pn.api.controller.AppHandler.getAllCustomers(AppHandler.java:24)
        at org.springframework.web.reactive.function.server.support.HandlerFunctionAdapter.handle(HandlerFunctionAdapter.java:61)
        at org.springframework.web.reactive.DispatcherHandler.invokeHandler(DispatcherHandler.java:161)
        at org.springframework.web.reactive.DispatcherHandler.lambda$handle$1(DispatcherHandler.java:146)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274)
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2199)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:137)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2007)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1881)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:171)
        at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4105)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:441)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:211)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:139)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:63)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4105)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
        at reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:64)
        at reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:228)
        at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:465)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:167)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:830)

存储库.java

    public Flux<Proto.Customer> allCustomers() {//rxjava2 returns Flowable<> ... Flux<>
        Flowable<Proto.Customer> customerFlowable =
                db.select(queryAllCustomers).get(new CustomerResultSetMapper());
        return Flux.from(customerFlowable);
    }

AppHandler.java

  public Mono<ServerResponse> getAllCustomers(ServerRequest request) {
    Flux<Proto.Customer> customers = repository.allCustomers();

    Proto.SearchResponse out = Proto.SearchResponse.newBuilder()
            .addAllCustomers(customers.toIterable()).build();

    return ServerResponse.ok()
        .contentType(MediaType.APPLICATION_JSON)
        .body(out, Proto.SearchResponse.class);
  }
4

0 回答 0