2

我相信整个互联网上都没有对此的回答,因为它可能非常复杂,但我会继续问。

基本上,我想在多个 Spring 应用程序之间进行交叉通信。他们每个人都以静态方式提供资源,这是该主题的链接。此服务由其他应用程序实例资本化,这些应用程序实例可以根据请求下载这些文件(现在我正在通过 HTTP 传输文件)。由于Downlolad,我能够下载文件并使用Project Reactor SO问题中的ExchangeFunction从ClientRequest保存文件。

现在我想提升我的代码,以便在连接问题或应用程序在给定的超时时间内暂时不可用的情况下,我能够恢复文件的下载。我在本文中配置了WebClient超时。

现在,我认为这样的代码实际上可以让我处理暂时不可用的服务:

final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);

Flux<DataBuffer> fileData = Mono.just(filePath)
    .map(file -> targetPath.toFile().exists() ? targetPath.toFile().length() : 0)
    .map(bytes -> webClient
            .get()
            .uri(uri)
            .accept(MediaType.APPLICATION_OCTET_STREAM)
            .header("Range", String.format("bytes=%d-", bytes))
            .retrieve()
            .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
            .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
            .bodyToFlux(DataBuffer.class)
    )
    .flatMapMany(Function.identity());

DataBufferUtils
        .write(fileData , fileChannel)
        .map(DataBufferUtils::release)
        .doOnError(throwable -> {
            try {
                fileChannel.force(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
        .retry(3)
        .doOnComplete(() -> {
            try {
                fileChannel.force(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
        .doOnError(e -> !(e instanceof ChannelException), e -> {
            try {
                Files.deleteIfExists(targetPath);
            } catch (IOException exc) {
                exc.printStackTrace();
            }
        })
        .doOnError(ChannelException.class, e -> {
            try {
                Files.deleteIfExists(targetPath);
            } catch (IOException exc) {
                exc.printStackTrace();
            }
        })
        .doOnTerminate(() -> {
            try {
                fileChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
    .blockLast();

但显然,每当我杀死我的第二个应用程序实例时,我都会收到一大堆错误:

reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
2019-10-25T15:41:53.602+0200 [ERROR] [xxx] [N/A:N/A] [r.core.publisher.Operators] { thread=reactor-http-nio-4  } Operator called default onErrorDropped
reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
    at reactor.core.Exceptions.bubble(Exceptions.java:154)
    at reactor.core.publisher.Operators.onErrorDropped(Operators.java:512)
    at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:343)
    at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:399)
    at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:258)
    at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:121)

以及稍后在同一个堆栈跟踪中:

2019-10-25T15:41:53.602+0200 [WARN] [xxx] [N/A:N/A] [i.n.c.AbstractChannelHandlerContext] { thread=reactor-http-nio-4  } An exception 'reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
reactor.core.Exceptions$BubblingException: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
    at reactor.core.Exceptions.bubble(Exceptions.java:154)
    at reactor.core.publisher.Operators.onErrorDropped(Operators.java:512)
    at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:343)
    at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:399)
    at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:258)
    at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:121)

异常本身并不是什么大问题,但关键是在我将应用程序重新启动后,我的下载不会恢复。

所以是的,我的问题是,我怎么可能恢复下载以及我应该/可以如何处理这里的异常?

4

2 回答 2

3

代码有 2 个问题。首先是在开始时确定文件大小的方法。正确的使用方法AtomicLong就像在DataBufferUtils.write(). 第二个问题是retry()远程调用之间的问题。retryWhen()在处理具有固定退避的远程调用时,这似乎是一种很好的方式。总结一下,这是使我能够在连接/应用程序问题的情况下从正确的字节恢复文件下载的代码

final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);

AtomicLong fileSize = new AtomicLong(targetPath.toFile().length());

Flux<DataBuffer> fileDataStream = webClient
                .get()
                .uri(remoteXFerServiceTargetHost)
                .accept(MediaType.APPLICATION_OCTET_STREAM)
                .header("Range", String.format("bytes=%d-", fileSize.get()))
                .retrieve()
                .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
                .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
                .bodyToFlux(DataBuffer.class);

DataBufferUtils
        .write(fileData , fileChannel)
        .map(DataBufferUtils::release)
        .doOnError(throwable -> {
            try {
                fileChannel.force(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
        .retryWhen(Retry.any().fixedBackoff(Duration.ofSeconds(5)).retryMax(5))
        .doOnComplete(() -> {
            try {
                fileChannel.force(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
        .doOnError(e -> !(e instanceof ChannelException), e -> {
            try {
                Files.deleteIfExists(targetPath);
            } catch (IOException exc) {
                exc.printStackTrace();
            }
        })
        .doOnError(ChannelException.class, e -> {
            try {
                Files.deleteIfExists(targetPath);
            } catch (IOException exc) {
                exc.printStackTrace();
            }
        })
        .doOnTerminate(() -> {
            try {
                fileChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        })
    .blockLast();
于 2019-11-04T08:52:52.940 回答
1

嗨,以下是我找到的实现Spring-webflux-file-upload-download

并基于它以下是我的文件下载代码:

import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.io.File;
import java.io.IOException;

@RestController
@RequestMapping("/")
public class DownloadContorller {

    @GetMapping
    public Mono<Void> downloadByWriteWith(ServerHttpResponse response) throws IOException {
        ZeroCopyHttpOutputMessage zeroCopyResponse = (ZeroCopyHttpOutputMessage) response;
        response.getHeaders().set(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=large-file.so");
        response.getHeaders().setContentType(MediaType.APPLICATION_OCTET_STREAM);

        Resource resource = new ClassPathResource("large-file.so");
        File file = resource.getFile();
        return zeroCopyResponse.writeWith(file, 0, file.length());
    }
}

我已经使用它进行了测试wget -c http://localhost:8080/,我能够从中断的地方恢复下载。我测试了它

  1. 从终端上的客户端 (Ctrl+C) 中断
  2. 停止应用程序,然后重新启动它。

我希望它有所帮助。

于 2019-11-01T04:47:16.060 回答