我正在使用 Spring boot 2.3.1、Webflux、带有响应式 mongodb 驱动程序的 Spring Data 和 Amazon SDk 2.14.6 实现一个响应式项目。
我有一个 CRUD,它在 MongoDB 上保留一个实体,并且必须将文件上传到 S3。我正在使用 SDK 反应式方法s3AsyncClient.putObject
,但遇到了一些问题。CompletableFuture抛出以下异常:
java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.ApiCallTimeoutException: Client execution did not complete before the specified timeout configuration: 60000 millis
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[na:na]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoMapFuseable] :
reactor.core.publisher.Mono.map(Mono.java:3054)
br.com.wareline.waredrive.service.S3Service.uploadFile(S3Service.java:94)
我尝试上传的文件大约有 34kb,它是一个简单的文本文件。
上传方法在我的S3Service.java
类中,它在DocumentoService.java中自动装配
@Component
public class S3Service {
@Autowired
private final ConfiguracaoService configuracaoService;
public Mono<PutObjectResponse> uploadFile(final HttpHeaders headers, final Flux<ByteBuffer> body, final String fileKey, final String cliente) {
return configuracaoService.findByClienteId(cliente)
.switchIfEmpty(Mono.error(new ResponseStatusException(HttpStatus.NOT_FOUND, String.format("Configuração com id %s não encontrada", cliente))))
.map(configuracao -> uploadFileToS3(headers, body, fileKey, configuracao))
.doOnSuccess(response -> {
checkResult(response);
});
}
private PutObjectResponse uploadFileToS3(final HttpHeaders headers, final Flux<ByteBuffer> body, final String fileKey, final Configuracao configuracao) {
final long length = headers.getContentLength();
if (length < 0) {
throw new UploadFailedException(HttpStatus.BAD_REQUEST.value(), Optional.of("required header missing: Content-Length"));
}
final Map<String, String> metadata = new HashMap<>();
final MediaType mediaType = headers.getContentType() != null ? headers.getContentType() : MediaType.APPLICATION_OCTET_STREAM;
final S3AsyncClient s3AsyncClient = getS3AsyncClient(configuracao);
return s3AsyncClient.putObject(
PutObjectRequest.builder()
.bucket(configuracao.getBucket())
.contentLength(length)
.key(fileKey)
.contentType(mediaType)
.metadata(metadata)
.build(),
AsyncRequestBody.fromPublisher(body))
.whenComplete((resp, err) -> s3AsyncClient.close())
.join();
}
public S3AsyncClient getS3AsyncClient(final Configuracao s3Props) {
final SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
.readTimeout(Duration.ofMinutes(1))
.writeTimeout(Duration.ofMinutes(1))
.connectionTimeout(Duration.ofMinutes(1))
.maxConcurrency(64)
.build();
final S3Configuration serviceConfiguration = S3Configuration.builder().checksumValidationEnabled(false).chunkedEncodingEnabled(true).build();
return S3AsyncClient.builder()
.httpClient(httpClient)
.region(Region.of(s3Props.getRegion()))
.credentialsProvider(() -> AwsBasicCredentials.create(s3Props.getAccessKey(), s3Props.getSecretKey()))
.serviceConfiguration(serviceConfiguration)
.overrideConfiguration(builder -> builder.apiCallTimeout(Duration.ofMinutes(1)).apiCallAttemptTimeout(Duration.ofMinutes(1)))
.build();
}
我的实现基于 Amazon SDK 文档和https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/s3/src/main/java/com/example上的代码示例/s3/S3AsyncOps.java
我无法弄清楚异步客户端超时问题的原因是什么。奇怪的是,当我使用相同的S3AsyncClient从存储桶下载文件时,它可以工作。我试图将S3AsyncClient中的超时时间增加到大约 5 分钟,但没有成功。我不知道我做错了什么。