3

Iteratee是否可以在 Java中使用 Play 的s?我找不到任何关于IterateeJava中使用的示例和文档,只有 Scala。我猜让 Iteratees 使用 PLay API 在 Java 中工作会有点混乱的代码(很多匿名Funtion1<?,>)......

如果可能的话,我想创建一个 App 控制器,它可以接受通过 HTTPs 分块传输编码上传的多部分文件上传,并将这些消息块向下游解析到 S3 存储。关于如何在 Java 中解决这个问题的任何想法?

干杯。

4

2 回答 2

4

Java SDK 包含类TransferManager来执行异步上传。它包含一个自己的可配置线程池。

用 Java 编写的迭代器可能能够将上传文件的字节直接推送到 S3,但代码看起来很笨拙且难以配置。对于很多用例,将文件从浏览器流式传输到临时文件(因此它没有完全在内存中)然后流式传输到 S3 就足够了。

我在Github上创建了一个示例项目如何做到这一点:

package controllers;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import play.*;
import play.libs.F.Function;
import play.libs.F.Promise;
import play.mvc.*;
import views.html.index;
import scala.concurrent.Promise$;

public class Application extends Controller {
    //don't forget to tm.shutdownNow() on application termination
    //you can configure a Thread pool http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/transfer/TransferManager.html#TransferManager(com.amazonaws.services.s3.AmazonS3, java.util.concurrent.ThreadPoolExecutor)
    private static TransferManager tm;
    private static final String BUCKET_NAME = "your-bucket";

    //this is bad style, use a plugin!
    static {
        final String accessKey = System.getenv("AWS_ACCESS");
        final String secretKey = System.getenv("AWS_SECRET");
        final AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
        tm = new TransferManager(credentials);
    }

    /** shows a form to upload a file */
    public static Result index() {
        return ok(index.render("Your new application is ready."));
    }

    /** uploads a file to Amazon S3. */
    public static Promise<Result> upload() {
        final Http.MultipartFormData.FilePart meta = request().body().asMultipartFormData().getFile("picture");
        final String key = meta.getFilename();
        final Upload upload = tm.upload(BUCKET_NAME, key, meta.getFile());
        Logger.info("start upload " + meta.getFilename());
        return asPromise(meta.getFilename(), upload).map(new Function<UploadResult, Result>() {
            @Override
            public Result apply(UploadResult uploadResult) throws Throwable {
                Logger.info("finished " + meta.getFilename());
                return ok(asString(uploadResult));
            }
        });
    }

    private static String asString(UploadResult result) {
        return "UploadResult{bucketName=" + result.getBucketName() + ", key=" + result.getKey() + ", version=" + result.getVersionId() + ", ETag=" + result.getETag() + "}";
    }

    private static Promise<UploadResult> asPromise(final String filename, final Upload upload) {
        final scala.concurrent.Promise<UploadResult> scalaPromise = Promise$.MODULE$.apply();
        upload.addProgressListener(new ProgressListener() {
            @Override
            public void progressChanged(ProgressEvent progressEvent) {
                if (progressEvent.getEventCode() == ProgressEvent.CANCELED_EVENT_CODE) {
                    scalaPromise.failure(new RuntimeException("canceled " + filename));
                } else if (progressEvent.getEventCode() == ProgressEvent.FAILED_EVENT_CODE) {
                    scalaPromise.failure(new RuntimeException("failed " + filename));
                } else if(progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) {
                    Logger.info("done " + filename);
                    try {
                        scalaPromise.success(upload.waitForUploadResult());
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        });
        return Promise.wrap(scalaPromise.future());
    }

}

在不同浏览器窗口中上传两个文件的示例日志输出:

[info] play - play-internal-execution-context-1 Application started (Dev)
[info] application - play-akka.actor.default-dispatcher-3 start upload file5.ext
[info] application - play-akka.actor.default-dispatcher-2 start upload file1.ext
[info] application - java-sdk-progress-listener-callback-thread done file1.ext
[info] application - play-akka.actor.default-dispatcher-5 finished file1.ext
[info] application - java-sdk-progress-listener-callback-thread done file5.ext
[info] application - play-akka.actor.default-dispatcher-5 finished file5.ext
于 2013-10-13T01:02:58.450 回答
0

我认为可以在 Java 中实现迭代。

在这个问题中有一个由 Sadache 编写的 Scala 中执行此操作的示例:Play 2.x : Reactive file upload with Iteratees

请注意,虽然没有可用于 S3 的异步 api 库,因此如果您使用官方 amazon api java 库,那么您将在上传结束时被阻止。

于 2013-09-20T08:42:33.043 回答