5

我想向after(d: FiniteDuration)(callback: => Unit)Scala 添加一个 util 来Future使我能够做到这一点:

val f = Future(someTask)

f.after(30.seconds) {
  println("f has not completed in 30 seconds!")
}

f.after(60.seconds) {
  println("f has not completed in 60 seconds!")
}

我怎样才能做到这一点?

4

4 回答 4

1

通常我使用线程池执行器并承诺:

import scala.concurrent.duration._
import java.util.concurrent.{Executors, ScheduledThreadPoolExecutor}
import scala.concurrent.{Future, Promise}

val f: Future[Int] = ???

val executor = new ScheduledThreadPoolExecutor(2, Executors.defaultThreadFactory(), AbortPolicy)

def withDelay[T](operation: ⇒ T)(by: FiniteDuration): Future[T] = {
  val promise = Promise[T]()
  executor.schedule(new Runnable {
    override def run() = {
      promise.complete(Try(operation))
    }
  }, by.length, by.unit)
  promise.future
}

Future.firstCompletedOf(Seq(f, withDelay(println("still going"))(30 seconds)))
Future.firstCompletedOf(Seq(f, withDelay(println("still still going"))(60 seconds)))
于 2016-06-09T22:45:23.553 回答
0

像这样的东西,也许:

  object PimpMyFuture {
     implicit class PimpedFuture[T](val f: Future[T]) extends AnyVal {
        def after(delay: FiniteDuration)(callback: => Unit): Future[T] = {
           Future {
             blocking { Await.ready(f, delay) }
           } recover { case _: TimeoutException => callback }
           f
        }
     } 
  }

  import PimpMyFuture._
  Future { Thread.sleep(10000); println ("Done") }
    .after(5.seconds) { println("Still going") }

这个实现很简单,但它基本上使您需要的线程数量增加了一倍——每个活动的未来有效地占用两个线程——这有点浪费。或者,您可以使用计划任务使您的等待无阻塞。我不知道scala中的“标准”调度程序(每个库都有自己的),但是对于这样的简单任务,您可以TimerTask直接使用java:

object PimpMyFutureNonBlocking {    
 val timer = new java.util.Timer

 implicit class PimpedFuture[T](val f: Future[T]) extends AnyVal {
    def after(delay: FiniteDuration)(callback: => Unit): Future[T] = {
       val task = new java.util.TimerTask {
          def run() { if(!f.isCompleted) callback }
       }
       timer.schedule(task, delay.toMillis)
       f.onComplete { _ => task.cancel }
       f
    }
  } 
}
于 2016-06-10T01:36:07.407 回答
0

一种方法是使用Future.firstCompletedOf(参见这篇博文):

val timeoutFuture = Future { Thread.sleep(500); throw new TimeoutException }

val f = Future.firstCompletedOf(List(f, timeoutFuture))
f.map { case e: TimeoutException => println("f has not completed in 0.5 seconds!") }

哪里TimeoutException是一些异常或类型。

于 2016-06-09T22:33:51.803 回答
0

使用import akka.pattern.after. 如果你想在没有 akka 的情况下实现它,这里是源代码。另一个(java)示例TimeoutFuturecom.google.common.util.concurrent.

于 2016-06-09T22:37:45.030 回答