2

我有一个 akka-http 服务,我正在尝试使用alpakka s3 连接器来上传文件。以前我使用的是临时文件,然后使用 Amazon SDK 上传。这种方法需要对 Amazon SDK 进行一些调整以使其更像 scala,但它甚至可以同时处理 1000 个请求。吞吐量并不惊人,但所有请求最终都通过了。这是更改前的代码,没有 alpakka:

```

path("uploadfile") {
    withRequestTimeout(20.seconds) {
        storeUploadedFile("csv", tempDestination) {
            case (metadata, file) =>
                val uploadFuture = upload(file, file.toPath.getFileName.toString)

                onComplete(uploadFuture) {
                    case Success(_) => complete(StatusCodes.OK)
                    case Failure(_) => complete(StatusCodes.FailedDependency)
                }
            }
        }
    }
}

case class S3UploaderException(msg: String) extends Exception(msg)

def upload(file: File, key: String): Future[String] = {
    val s3Client = AmazonS3ClientBuilder.standard()
        .withCredentials(new DefaultAWSCredentialsProviderChain())
        .withRegion(Regions.EU_WEST_3)
        .build()

    val promise = Promise[String]()

    val listener = new ProgressListener() {
        override def progressChanged(progressEvent: ProgressEvent): Unit = {
            (progressEvent.getEventType: @unchecked) match {
                case ProgressEventType.TRANSFER_FAILED_EVENT => promise.failure(S3UploaderException(s"Uploading a file with a key: $key"))
                case ProgressEventType.TRANSFER_COMPLETED_EVENT |
                     ProgressEventType.TRANSFER_CANCELED_EVENT => promise.success(key)
            }
        }
    }

    val request = new PutObjectRequest("S3_BUCKET", key, file)
    request.setGeneralProgressListener(listener)

    s3Client.putObject(request)

    promise.future
}

```

当我将其更改为使用 alpakka 连接器时,代码看起来好多了,因为我们可以将ByteSourcealpakka 和 alpakka连接Sink在一起。然而,这种方法无法处理如此大的负载。当我一次执行 1000 个请求(10 kb 文件)时,只有不到 10% 的请求通过,其余的则失败并出现异常:

akka.stream.alpakka.s3.impl.FailedUpload:超过配置的最大打开请求值 [32]。这意味着这个池的请求队列(HostConnectionPoolSetup(bargain-test.s3-eu-west-3.amazonaws.com,443,ConnectionPoolSetup(ConnectionPoolSettings(4,0,5,32,1,30 seconds,ClientConnectionSettings(Some (User-Agent: akka-http/10.1.3),10秒,1分钟,512,None,WebSocketSettings(,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda$4787/1279590204@ 4d809f4c),List(),ParserSettings(2048,16,64,64,8192,64,8388608,256,1048576,Strict,RFC6265,true,Set(),Full,Error,Map(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, Content-MD5 -> 0, Date -> 0, If-Match -> 0, If-None-Match -> 0 , 用户代理 -> 32),false,true,akka.util。ConstantFun$$$Lambda$4534/1539966798@69c23cd4,akka.util.ConstantFun$$$Lambda$4534/1539966798@69c23cd4,akka.util.ConstantFun$$$Lambda$4535/297570074@6b426c59),无,TCPTransport),新,1 second),akka.http.scaladsl.HttpsConnectionContext@7e0f3726,akka.event.MarkerLoggingAdapter@74f3a78b))) 已完全填满,因为池当前处理请求的速度不足以处理传入的请求负载。请稍后重试请求。看 请稍后重试请求。看 请稍后重试请求。看 http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html 了解更多信息。

以下是 Gatling 测试的摘要:

---- 响应时间分布 ---------------------------------------- t < 800毫秒 0 ( 0%)

800 毫秒 < t < 1200 毫秒 0 ( 0%)

t > 1200 毫秒 90 ( 9%)

失败 910 ( 91%)


当我同时执行 100 个请求时,有一半会失败。因此,仍然接近令人满意。

这是一个新代码:```

path("uploadfile") {
    withRequestTimeout(20.seconds) {
        extractRequestContext { ctx =>
            implicit val materializer = ctx.materializer

            extractActorSystem { actorSystem =>

                fileUpload("csv") {

                    case (metadata, byteSource) =>

                        val uploadFuture = byteSource.runWith(S3Uploader.sink("s3FileKey")(actorSystem, materializer))

                        onComplete(uploadFuture) {
                            case Success(_) => complete(StatusCodes.OK)
                            case Failure(_) => complete(StatusCodes.FailedDependency)
                        }
                }            
            }
        }
    }
}

def sink(s3Key: String)(implicit as: ActorSystem, m: Materializer) = {
    val regionProvider = new AwsRegionProvider {
        def getRegion: String = Regions.EU_WEST_3.getName
    }

    val settings = new S3Settings(MemoryBufferType, None, new DefaultAWSCredentialsProviderChain(), regionProvider, false, None, ListBucketVersion2)
    val s3Client = new S3Client(settings)(as, m)

    s3Client.multipartUpload("S3_BUCKET", s3Key)
}

```

可以在此处查看具有两个端点的完整代码

我有一些问题。

1)这是一个功能吗?这就是我们所说的背压吗?

2)如果我希望这段代码表现得像带有临时文件的旧方法(没有失败的请求并且所有请求都在某个时候完成),我该怎么办?我试图为流实现一个队列(链接到下面的源代码),但这没有任何区别。代码可以在这里看到。

(* 免责声明 * 我仍然是 scala 新手,试图快速理解 akka 流并找到解决该问题的方法。这段代码很可能存在一些简单的错误。* 免责声明 *)

4

1 回答 1

0

这是一个背压功能。

Exceeded configured max-open-requests value of [32]在配置max-open-requests中默认设置为 32。流式处理用于处理大量数据,而不是每秒处理许多请求。

Akka 开发人员不得不为max-open-requests. 他们肯定会出于某种原因选择 32。他们不知道它将用于什么。它可以同时发送 1000 个 32KB 文件或 1000 个 1GB 文件吗?他们不知道。但他们仍然希望确保在默认情况下(并且 80%​​ 的人可能使用默认值)应用程序将得到优雅和安全的处理。所以他们不得不限制处理能力。

您要求“现在”执行 1000 个,但我很确定 AWS 不会同时发送 1000 个文件,而是使用了一些队列,如果您有许多小文件要上传,这对您来说也是一个很好的例子。

但是根据您的情况调整它是非常好的!如果您知道您的机器和目标将处理更多的同时连接,您可以将数字更改为更高的值。

此外,对于很多 HTTP 调用,使用缓存的主机连接池

于 2018-08-27T08:17:50.910 回答