1

我正在开发一个 Finagle HTTP 应用程序,其中服务的实现没有利用 Futures 并通过第三方库访问 Redis。此类服务具有以下形式:

class SampleOldService extends Service[Request, Response] {
  def apply(req: Request): Future[Response] = {
    val value: Int = getValueFromRedis()
    val response: Response = buildResponse(value)
    Future.value(response)
  }
}

(它们比这复杂得多——这里的重点是它们是同步的。)

在某个时候,我们开始使用 Futures 和 Finagle Redis API 开发新服务。Redis 调用封装在 Store 类中。新服务具有以下形式:

class SampleNewService extends Service[Request, Response] {
  def apply(req: Request): Future[Response] = {
    val value: Future[Int] = Store.getValue()
    val response: Future[Response] = value map buildResponse
    response
  }
}

(它们比这复杂得多——这里的重点是它们是异步的。)

我们开始重构旧服务以利用异步性和 Futures。我们希望逐步地做到这一点,而不必一次完全重新实现它们。

第一步是尝试使用新的 Store 类,代码如下:

class SampleOldService extends Service[Request, Response] {
  def apply(req: Request): Future[Response] = {
    val valueFuture: Future[Int] = Store.getValue()
    val value: Int = Await.result(valueFuture)
    val response: Response = buildResponse(value)
    Future.value(response)
  }
}

然而,事实证明这是灾难性的,因为在重负载时,对旧服务的请求会停留在 Await.result() 调用中。新的异步服务显示没有问题。

该问题似乎与线程耗尽和/或未来池有关。我们已经找到了几个关于如何使用自定义池(例如FuturePool )从异步调用中进行同步调用(执行 I/O)的解决方案,但不是相反,这是我们的案例。

那么,在 Finagle 中从同步代码调用异步代码(执行 I/O)的推荐方式是什么?

4

2 回答 2

1

您可以做的最简单的事情是使用返回 Future 的线程池包装您的同步调用。Twitter 的 util-core 提供了FuturePool实用程序来实现这一点。

类似的东西(未经测试的代码):

import com.twitter.util.FuturePool

val future = FuturePool.unboundedPool {
  val result = myBlockingCall.await()
  result
}
于 2016-08-30T15:09:24.670 回答
1

您可以使用 FuturePool,它们是在缓存线程池之上运行的期货,但为什么要这样做,让服务返回一个承诺,并在您从商店类完成未来时设置承诺的值。

val p: Promise[Response] = Promise[Response]()
val value: Future[Int] = Store.getValue()
value onSuccess {x => 
  val result: Response = buildResponse(x)
  p.setValue(result)
}
p 
于 2017-03-28T12:56:53.007 回答