我相信整个互联网上都没有对此的回答,因为它可能非常复杂,但我会继续问。
基本上,我想在多个 Spring 应用程序之间进行交叉通信。他们每个人都以静态方式提供资源,这是该主题的链接。此服务由其他应用程序实例资本化,这些应用程序实例可以根据请求下载这些文件(现在我正在通过 HTTP 传输文件)。由于Downlolad,我能够下载文件并使用Project Reactor SO问题中的ExchangeFunction从ClientRequest保存文件。
现在我想提升我的代码,以便在连接问题或应用程序在给定的超时时间内暂时不可用的情况下,我能够恢复文件的下载。我在本文中配置了WebClient
超时。
现在,我认为这样的代码实际上可以让我处理暂时不可用的服务:
final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);
Flux<DataBuffer> fileData = Mono.just(filePath)
.map(file -> targetPath.toFile().exists() ? targetPath.toFile().length() : 0)
.map(bytes -> webClient
.get()
.uri(uri)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.header("Range", String.format("bytes=%d-", bytes))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
.onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
.bodyToFlux(DataBuffer.class)
)
.flatMapMany(Function.identity());
DataBufferUtils
.write(fileData , fileChannel)
.map(DataBufferUtils::release)
.doOnError(throwable -> {
try {
fileChannel.force(true);
} catch (IOException e) {
e.printStackTrace();
}
})
.retry(3)
.doOnComplete(() -> {
try {
fileChannel.force(true);
} catch (IOException e) {
e.printStackTrace();
}
})
.doOnError(e -> !(e instanceof ChannelException), e -> {
try {
Files.deleteIfExists(targetPath);
} catch (IOException exc) {
exc.printStackTrace();
}
})
.doOnError(ChannelException.class, e -> {
try {
Files.deleteIfExists(targetPath);
} catch (IOException exc) {
exc.printStackTrace();
}
})
.doOnTerminate(() -> {
try {
fileChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
})
.blockLast();
但显然,每当我杀死我的第二个应用程序实例时,我都会收到一大堆错误:
reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
2019-10-25T15:41:53.602+0200 [ERROR] [xxx] [N/A:N/A] [r.core.publisher.Operators] { thread=reactor-http-nio-4 } Operator called default onErrorDropped
reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
at reactor.core.Exceptions.bubble(Exceptions.java:154)
at reactor.core.publisher.Operators.onErrorDropped(Operators.java:512)
at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:343)
at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:399)
at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:258)
at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:121)
以及稍后在同一个堆栈跟踪中:
2019-10-25T15:41:53.602+0200 [WARN] [xxx] [N/A:N/A] [i.n.c.AbstractChannelHandlerContext] { thread=reactor-http-nio-4 } An exception 'reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
at reactor.core.Exceptions.bubble(Exceptions.java:154)
at reactor.core.publisher.Operators.onErrorDropped(Operators.java:512)
at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:343)
at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:399)
at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:258)
at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:121)
异常本身并不是什么大问题,但关键是在我将应用程序重新启动后,我的下载不会恢复。
所以是的,我的问题是,我怎么可能恢复下载以及我应该/可以如何处理这里的异常?