只有当您使用阻塞来获取Future
. 如果你想使用非阻塞回调onComplete
,onSuccess
或者onFailure
,那么你将不得不滚动你自己的超时处理。Akka 为参与者之间的请求/响应 ( ?
) 消息传递内置了超时处理,但不确定您是否要开始使用 Akka。FWIW,在 Akka 中,对于超时处理,它们Futures
通过 组合在一起Future.firstCompletedOf
,一个代表实际的异步任务,一个代表超时。如果超时计时器(通过 a HashedWheelTimer
)首先弹出,则异步回调失败。
一个非常简单的滚动你自己的例子可能是这样的。首先,一个用于调度超时的对象:
import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException
object TimeoutScheduler{
val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
def scheduleTimeout(promise:Promise[_], after:Duration) = {
timer.newTimeout(new TimerTask{
def run(timeout:Timeout){
promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))
}
}, after.toNanos, TimeUnit.NANOSECONDS)
}
}
然后是一个函数来获取 Future 并向其添加超时行为:
import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration
def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
val prom = Promise[T]()
val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
fut onComplete{case result => timeout.cancel()}
combinedFut
}
请注意,HashedWheelTimer
我在这里使用的是来自 Netty。