1

我创建了一个返回无限 Flux 的服务器和一个从响应中异步读取对象的客户端。我希望客户端取消订阅 Flux 并停止处理它。

服务器的控制器:

@GetMapping(path = "/infinite", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<String> getStreamOfLongs() {
    return Flux.generate(sink -> sink.next("x"));
}

客户端:

WebClient client = WebClient.create("http://localhost:8080");
    Flux<String> flux = client.get()
            .uri("/infinite")
            .accept(TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux(String.class);
    Disposable disposable = flux.subscribe(consumer);
    Executors.newSingleThreadScheduledExecutor().schedule(() -> disposable.dispose(), 5, TimeUnit.SECONDS);

这是取消订阅流的正确方法吗?
当客户端“想要”停止读取更多数据时,它需要做什么?

当客户端取消订阅(使用disposable.dispose())时,服务器端会抛出2个异常(IOException和UnsupportedOperationException):

java.io.IOException:
sun.nio.ch.FileDispatcherImpl.writev0(本机方法)的管道损坏 ~[na:1.8.0_131]
在 sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)~[na :1.8.0_131]
在 sun.nio.ch.IOUtil.write(IOUtil.java:148) ~[na:1.8.0_131]
在 sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) ~ [na :1.8.0_131]
在 io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty .channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty.channel.AbstractChannel$ AbstractUnsafe.flush(AbstractChannel.java:901) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) ~ [netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.16.Final.jar: 4.1.16.Final]
在 io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush( CombinedChannelDuplexHandler.java:533)~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)~[netty-transport-4.1 .16.Final.jar:4.1.16.Final]
在 io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext. java:749) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.16 .Final.jar:4.1.16.Final]
在 io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io .netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 reactor.ipc.netty.channel.ChannelOperationsHandler$PublisherSender。 onComplete(ChannelOperationsHandler.java:505) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:130) ~[reactor -core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:184) ~[reactor-core-3.1.1.RELEASE.jar: 3.1.1.RELEASE]
在 reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.Flux.subscribe(Flux. java:6516) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:433) ~[reactor-netty-0.7 .1.RELEASE.jar:0.7.1.RELEASE]
在 reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:179) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1。发布]
在 io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext. java:814) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-transport-4.1.16 .Final.jar:4.1.16.Final]
在 io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io .netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
在 reactor.ipc.netty.NettyOutbound.lambda$sendObject$6 (NettyOutbound.java:298)~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE]
at reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:106)~[reactor- netty-0.7.1.RELEASE.jar:0.7.1.RELEASE]
在 reactor.core.publisher.Mono.subscribe(Mono.java:2913) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1 .RELEASE]
在 reactor.ipc.netty.NettyOutbound.subscribe(NettyOutbound.java:356) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE]
在 reactor.core.publisher.FluxConcatMap$ConcatMapDelayed .drain(FluxConcatMap.java:744) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxConcatMap$ConcatMapDelayed.onNext(FluxConcatMap.java:581) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxMap$MapSubscriber.onNext (FluxMap.java:108) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:150) ~[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE]
在 reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.1.RELEASE .jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxGenerate$GenerateSubscription.next(FluxGenerate.java:164) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 io.github.msayag.webflux.MyController.lambda$getStreamOfLongs$0(MyController.java:44) ~[classes/:na]
...
在 io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:第858章~[netty-common-4.1.16.Final.jar:4.1.16.Final]
在java.lang.Thread.run(Thread.java:748)~[na:1.8.0_131]

其次是

2017-11-24 01:04:09.476 错误 83663 --- [ctor-http-nio-2] oswsadapter.HttpWebHandlerAdapter:无法处理请求

java.lang.UnsupportedOperationException: null
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457) ~[na:1.8.0_131]
at org.springframework.http.HttpHeaders.set(HttpHeaders.java:1439) ~ [spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE]
在 org.springframework.http.HttpHeaders.setContentType(HttpHeaders.java:849) ~[spring-web-5.0.1.RELEASE.jar: 5.0.1.RELEASE]
在 org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler.write(AbstractErrorWebExceptionHandler.java:235) ~[spring-boot-autoconfigure-2.0.0.M6.jar:2.0.0 .M6]
在 org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler.lambda$handle$1(AbstractErrorWebExceptionHandler.java:228) ~[spring-boot-autoconfigure-2.0.0.M6.jar:2.0.0.M6]
在 reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.Operators$MonoSubscriber.complete (Operators.java:1092) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) ~[reactor- core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198) ~[reactor-core-3.1.1.RELEASE.jar:3.1 .1.发布]
在 reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1649) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber。 request(FluxPeekFuseable.java:138) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:230) ~[reactor -core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:172) ~[reactor-core-3.1.1.RELEASE.jar: 3.1.1.RELEASE]
在 reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext( MonoFlatMap.java:150) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core- 3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1 .RELEASE]
在 reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1649) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1463) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.Operators$MultiSubscriptionSubscriber。 onSubscribe(Operators.java:1337) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core -3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1. RELEASE]
在 reactor.core.publisher.Mono.subscribe(Mono.java:2913) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.Operators.complete( Operators.java:125) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45) ~[reactor-core-3.1 .1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap. java:60) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.Mono.subscribe(Mono.java:2913) ~[reactor-core-3.1.1 .RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:185) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoFlatMap$FlatMapInner.onError (MonoFlatMap.java:251)~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.Operators.error( Operators.java:175) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:129) ~[reactor-core-3.1 .1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.Mono.subscribe(Mono.java:2913) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 org.springframework.http.server.reactive。 ChannelSendOperator$WriteCompletionBarrier.onError(ChannelSendOperator.java:339) ~[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE]
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:87 ) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1332) ~[reactor-core-3.1.1. RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1135) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onError(MonoIgnoreThen.java:300) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber。 onError(MonoIgnoreElements.java:75)~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxConcatMap$ConcatMapDelayed.drain(FluxConcatMap.java:660)~[reactor -core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 reactor.core.publisher.FluxConcatMap$ConcatMapDelayed.onNext(FluxConcatMap.java:581) ~[reactor-core-3.1.1.RELEASE.jar: 3.1.1.RELEASE]
在 reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
在 org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:150) ~[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE]
在 reactor.core.publisher。 FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at reactor.core.publisher.FluxGenerate$GenerateSubscription.next(FluxGenerate.java:164 ) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]
at io.github.msayag.webflux.MyController.lambda$getStreamOfLongs$0(MyController.java:44) ~[classes/:na]
...
在 io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) ~[netty-common-4.1.16.Final.jar:4.1.16.Final]
在 java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]

4

1 回答 1

2

据我所知,你这样做是对的。

Disposable::disposeSubscriber有效地取消了流,从您不再对接收数据感兴趣的角度来看,这是您应该做的。

在旁边调用它WebClient会导致关闭 HTTP 连接。我认为没有一种“更干净”的方式可以告诉服务器您不想再接收数据了。使用 HTTP/2,情况可能会有所不同,因为可以在不关闭整个连接的情况下关闭 HTTP 流。

从服务器的角度来看,客户端主动取消与客户端因错误而关闭连接看起来相同。所以这些例外都表明

  1. 服务器尝试写入时连接已关闭
  2. 响应没有被正确处理(服务器还有东西要写)

如果您对此行为有改进想法,请在https://jira.spring.io上创建票证

于 2017-11-24T08:21:19.363 回答