0

目前,我正在使用 Java 的 S3 SDK 中的 AWS 的 TransferManager 触发从 SFTP 服务器到 S3 的上传。我触发此上传的方式如下:

(伪代码...)

    @Autowired
    TransferManager transferManager;

    @Autowired
    SftpStreamFactory sftpStreamFactory;

    SftpStream sftpStream = sftpStreamFactory.createStream(filePath);
    ObjectMetadata objectMetadata = new ObjectMetadata();
    objectMetadata.setContentLength(sftpStream.getSizeBytes());
    PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, key, sftpStream.getStream(), objectMetadata);
    putObjectRequest.setGeneralProgressListener(new UploadBeginEndNotificationListener(uploadRequest, statusNotifier));
    
    transferManager.upload(putObjectRequest);

这是 的定义SftpStream

@AllArgsConstructor
public class SftpStreamFactory {

@Getter
@AllArgsConstructor
public static class SftpStream {
    private final long sizeBytes;
    private final InputStream stream;
}

private final SftpRemoteFileTemplate sftpTemplate;
private final SftpProperties sftpProperties;

public SftpStream createStream(Path relativePath) {
    return sftpTemplate.<SftpStream, ChannelSftp>executeWithClient(session -> createStream(session, relativePath));
}

SftpStream createStream(ChannelSftp channelSftp, Path relativePath) {

    String path = sftpProperties.getRoot().resolve(relativePath).toString();

    try {
        SftpATTRS fileAttrs = channelSftp.lstat(path);
        long size = fileAttrs.getSize();
        return new SftpStream(size, channelSftp.get(path));
    }
    catch (SftpException e) {
        throw new UncheckedIOException(new NestedIOException("SFTP Error", e));
    }
}

}

这种上传方法效果很好。但是,如果分段上传在中间被暂停/取消/以其他方式中止,我们希望从中断的地方继续,而不是重新开始。我们知道 TransferManagersresumeUpload方法需要一个PersistableUpload.

但是,在 javadoc 中PersistableUpload,它期望file在构造函数中传递一个路径,然后尝试从中创建一个File对象: https ://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws /services/s3/transfer/PersistableUpload.html

我们想知道的是,在没有这个文件对象的情况下是否可以恢复上传,我们无法从我们的ChannelSftp. 也就是说,我们可以从流而不是文件恢复上传吗?或者我们是否必须切换到使用低级 s3 api 来执行这样的恢复。任何建议表示赞赏。

编辑 - 进行了更多研究,甚至为已经存在的上传传递了 UploadId,如果没有文件,doUpload 方法将抛出异常。有任何想法吗?

4

1 回答 1

0

答案是,不,没有文件就无法恢复上传,但是对于您的类似情况,有一种解决方法:

#中止前暂停

  1. 当且仅当,在任何中断发生之前,尝试暂停连接
boolean forceCancel = true;
PauseResult<PersistableUpload> pauseResult = myUpload.tryPause(forceCancel);
  1. 将数据保存info to resume到文件
PersistableUpload persistableUpload = pauseResult.getInfoToResume();

File f = new File("UNIQUE-ID-FOR-UPLOADED-FILE"); //blob
if (!f.exists())
    f.createNewFile();
FileOutputStream fos = new FileOutputStream(f);

// Serialize the persistable upload to the file.
persistableUpload.serialize(fos);
fos.close();
  1. 有时稍后再继续
TransferManager tm = new TransferManager();
FileInputStream fis = new FileInputStream(new File("UNIQUE-ID-FOR-UPLOADED-FILE"));

// Deserialize PersistableUpload information from disk.
PersistableUpload persistableUpload = PersistableTransfer.deserializeFrom(fis);

// Call resumeUpload with PersistableUpload.
tm.resumeUpload(persistableUpload);

fis.close();

#如果发生某些事情(例如 JVM 崩溃),请保存流以维护文件

用于持久S3SyncProgressListenerTransferManager#upload每个更改并将数据序列化到磁盘。

transferManager.upload(putObjectRequest, new S3SyncProgressListener() {

    ExecutorService executor = Executors.newFixedThreadPool(1);

    @Override
    public void onPersistableTransfer(final PersistableTransfer persistableTransfer) {

       executor.submit(new Runnable() {
          @Override
          public void run() {
              try {
                  File f = new File("UNIQUE-ID-FOR-UPLOADED-FILE");
                  if (!f.exists()) {
                      f.createNewFile();
                  }
                  FileOutputStream fos = new FileOutputStream(f);
                  persistableTransfer.serialize(fos);
                  fos.close();
              } catch (IOException e) {
                  throw new RuntimeException("Unable to persist transfer to disk.", e);
              }
          }
       });
    }
});

希望能帮助到你。

于 2020-07-06T15:20:01.237 回答