3

我将此问题发布到 CXF 列表中,但没有任何运气。所以我们开始吧。我正在尝试将大文件上传到远程服务器(想想它们是虚拟机磁盘)。所以我有一个接受上传请求的宁静服务。上传的处理程序如下所示:

@POST
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Path("/doupload")
public Response receiveStream(MultipartBody multipart) {
    List<Attachment> allAttachments = body.getAllAttachments();
    Attachment att = null;
    for (Attachment b : allAttachments) {
        if (UPLOAD_FILE_DESCRIPTOR.equals(b.getContentId())) {
            att = b;
        }
    }
    Assert.notNull(att);
    DataHandler dh = att.getDataHandler();
    if (dh == null) {
        throw new WebApplicationException(HTTP_BAD_REQUEST);
    }
    try {
        InputStream is = dh.getInputStream();
        byte[] buf = new byte[65536];
        int n;
        OutputStream os = getOutputStream();
        while ((n = is.read(buf)) > 0) {
            os.write(buf, 0, n);
        }
        ResponseBuilder rb = Response.status(HTTP_CREATED);
        return rb.build();
    } catch (IOException e) {
        log.error("Got exception=", e);
        throw new WebApplicationException(HTTP_INTERNAL_ERROR);
    } catch (NoSuchAlgorithmException e) {
        log.error("Got exception=", e);
        throw new WebApplicationException(HTTP_INTERNAL_ERROR);
    } finally {}

}

这段代码的客户端相当简单:

public void sendLargeFile(String filename) {
    WebClient wc = WebClient.create(targetUrl);
    InputStream is = new FileInputStream(new File(filename));
    Response r = wc.post(new Attachment(Constants.UPLOAD_FILE_DESCRIPTOR,
        MediaType.APPLICATION_OCTET_STREAM, is));
}

该代码在功能方面运行良好。在性能方面,我注意到在我的处理程序(receiveStream() 方法)从流中获取第一个字节之前,整个流实际上被持久化到一个临时文件中(使用 CachedOutputStream)。不幸的是,这对我来说是不可接受的。

  • 我的处理程序只是将传入的字节传递给后端存储系统(虚拟机磁盘存储库),等待整个磁盘写入缓存只是再次读取需要大量时间,占用大量资源,并且降低吞吐量。
  • 写入块并再次读取它们会产生成本,因为应用程序在云中运行,并且云提供商对每个读取/写入的块收费。
  • 由于每个字节都写入本地磁盘,我的服务 VM 必须有足够的磁盘空间来容纳所有上传的流的总大小(即,如果我有 10 个上传,每个 100GB,我必须有 1TB 的磁盘来缓存内容)。这又是一笔额外的钱,因为服务 VM 的大小急剧增长,云提供商也会对预置的磁盘大小收费。

鉴于所有这些,我正在寻找一种方法来使用 HTTP InputStream(或尽可能接近它)直接从那里读取附件并在之后处理它。我想这个问题可以转化为以下之一: - 有没有办法告诉 CXF 不做缓存 - 或者 - 有没有办法让 CXF 传递一个输出流(我写的)来使用,而不是使用 CachedOutputStream

我在这里发现了一个类似的问题。决议说使用 CXF 2.2.3 或更高版本,我正在使用 2.4.4(并尝试使用 2.7.0)但没有运气。

谢谢。

4

2 回答 2

3

我认为这在逻辑上是不可能的(无论是在 CXF 还是在其他任何地方)。您正在调用getAllAttachements(),这意味着服务器应该从 HTTP 输入流中收集有关它们的信息。这意味着整个流必须进入内存进行MIME解析。

在您的情况下,您应该直接使用流,并自己进行 MIME 解析:

public Response receiveStream(InputStream input) {

现在您可以完全控制输入,并且可以逐字节将其消耗到内存中。

于 2012-11-28T20:56:34.653 回答
1

我最终以一种不雅的方式解决了这个问题,但它确实有效,所以我想分享我的经验。如果有一些“标准”或更好的方法,请告诉我。

由于我正在编写服务器端,我知道我正在按发送顺序访问所有附件,并在它们流入时对其进行处理。因此,为了反映处理程序方法(上面的receiveStream() 方法)的这种行为,我在服务器端创建了一个名为“@SequentialAttachmentProcessing”的新注释,并用它注释了我的上述方法。

此外,还编写了一个名为 SequentialAttachment 的 Attachment 子类,它的作用类似于链表。它有一个跳过当前附件的skip()方法,当一个附件结束时,hasMore()方法告诉你是否还有另一个。

然后我编写了一个自定义的 multipart/form-data 提供程序,其行为如下:如果目标方法如上注释,则处理附件,否则调用默认提供程序进行处理。当它由我的提供者处理时,它总是最多返回一个附件。因此,它可能会误导非可疑的处理方法。但是,我认为这是可以接受的,因为服务器的编写者必须将该方法注释为“@SequentialAttachmentProcessing”,因此必须知道这意味着什么。

因此,receiveStream() 方法的实现现在类似于:

@POST
@SequentialAttachmentProcessing
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Path("/doupload")
public Response receiveStream(MultipartBody multipart) {
    List<Attachment> allAttachments = body.getAllAttachments();
    Assert.isTrue(allAttachments.size() <= 1);
    if (allAttachment.size() > 0) {
        Attachment head = allAttachments.get(0);
        Assert.isTrue(head instanceof SequentialAttachment);
        SequentialAttachment att = (SequentialAttachment) head;
        while (att != null) {
            DataHandler dh = att.getDataHandler();
            InputStream is = dh.getInputStream();
            byte[] buf = new byte[65536];
            int n;
            OutputStream os = getOutputStream();
            while ((n = is.read(buf)) > 0) {
                os.write(buf, 0, n);
            }
            if (att.hasMore()) {
                att = att.next();
            }
        }
    }
}

虽然这解决了我眼前的问题,但我仍然相信必须有一个标准的方法来做到这一点。我希望这可以帮助别人。

于 2012-12-07T16:55:51.970 回答