1

假设我有一个函数fab: A => Future[B]并希望它返回一个在截止日期之前完成的新未来。所以我正在写一个这样的新deadlined函数

def deadlined[B](fut: => Future[B], deadline: Deadline): Future[B] = ???

现在我正在使用java.util.Timer但可以ScheduledThreadPoolExecutor按照建议使用。最好的解决方案可能是一个包装器,用于抽象调度实现并按照评论中的建议在测试中模拟它。

object Deadlined {

  private val timer = new java.util.Timer() // todo: replace it with a wrapper

  def apply[B](fut: => Future[B], deadline: Deadline)(implicit ec: ExecutionContext): Future[B] = {
    val promise = Promise[B]()
    val timerTask = new java.util.TimerTask {
      override def run(): Unit = promise.failure(new Exception(s"$deadline is exceeded"))
    }
    timer.schedule(timerTask, deadline.timeLeft.toMillis)
    fut.transform { result =>
      timerTask.cancel()
      result match {
        case Success(b) => promise.success(b)
        case Failure(t) => promise.failure(t)
      }
      result
    }
    promise.future
  }
}

是否有意义 ?我还想知道如何从对我之前的问题的回答中找出一个共同的部分Deadlined延迟。

4

1 回答 1

2

我可能会做类似于以下的事情,所以我可以为任何 Future(YMMV、Caveat Emptor 等)添加截止日期:

import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.duration.FiniteDuration
import java.util.{Timer, TimerTask}

implicit class DeadlineFuture[T](future: Future[T]) {
  def deadline(d: FiniteDuration)(implicit timer: Timer): Future[T] = {
    if (future.isCompleted) future
    else {
      val promise = Promise[T]()
      val timerTask = new TimerTask {
        override def run(): Unit = promise.tryFailure(new Exception(s"$d is exceeded"))
      }
      timer.schedule(timerTask, d.toMillis)
      future.onComplete(_ => timerTask.cancel())(ExecutionContext.parasitic)
      promise.completeWith(future).future
    }
  }
}

// Usage:
Future.never.deadline(5.seconds).onComplete(println)
于 2021-04-10T19:57:57.777 回答