64

从官方教程参考文献中我并不完全理解期货的一个方面。http://docs.scala-lang.org/overviews/core/futures.html

scala中的期货是否具有某种内置的超时机制?假设下面的示例是一个 5 GB 的文本文件……“Implicits.global”的隐含范围最终会导致 onFailure 以非阻塞方式触发还是可以定义?如果没有某种默认超时,这是否意味着成功或失败都不会触发?

import scala.concurrent._
import ExecutionContext.Implicits.global

val firstOccurence: Future[Int] = future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
  case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
  case t => println("Could not process file: " + t.getMessage)
}
4

14 回答 14

73

只有当您使用阻塞来获取Future. 如果你想使用非阻塞回调onCompleteonSuccess或者onFailure,那么你将不得不滚动你自己的超时处理。Akka 为参与者之间的请求/响应 ( ?) 消息传递内置了超时处理,但不确定您是否要开始使用 Akka。FWIW,在 Akka 中,对于超时处理,它们Futures通过 组合在一起Future.firstCompletedOf,一个代表实际的异步任务,一个代表超时。如果超时计时器(通过 a HashedWheelTimer)首先弹出,则异步回调失败。

一个非常简单的滚动你自己的例子可能是这样的。首先,一个用于调度超时的对象:

import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException

object TimeoutScheduler{
  val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
  def scheduleTimeout(promise:Promise[_], after:Duration) = {
    timer.newTimeout(new TimerTask{
      def run(timeout:Timeout){              
        promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))        
      }
    }, after.toNanos, TimeUnit.NANOSECONDS)
  }
}

然后是一个函数来获取 Future 并向其添加超时行为:

import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration

def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
  val prom = Promise[T]()
  val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
  val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
  fut onComplete{case result => timeout.cancel()}
  combinedFut
}

请注意,HashedWheelTimer我在这里使用的是来自 Netty。

于 2013-04-30T16:48:37.900 回答
26

所有这些答案都需要额外的依赖。我决定使用 java.util.Timer 编写一个版本,这是将来运行函数的有效方法,在这种情况下触发超时。

博客文章在此处提供更多详细信息

将它与 Scala 的 Promise 一起使用,我们可以创建一个带有超时的 Future,如下所示:

package justinhj.concurrency

import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps

object FutureUtil {

  // All Future's that use futureWithTimeout will use the same Timer object
  // it is thread safe and scales to thousands of active timers
  // The true parameter ensures that timeout timers are daemon threads and do not stop
  // the program from shutting down

  val timer: Timer = new Timer(true)

  /**
    * Returns the result of the provided future within the given time or a timeout exception, whichever is first
    * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
    * Thread.sleep would
    * @param future Caller passes a future to execute
    * @param timeout Time before we return a Timeout exception instead of future's outcome
    * @return Future[T]
    */
  def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {

    // Promise will be fulfilled with either the callers Future or the timer task if it times out
    val p = Promise[T]

    // and a Timer task to handle timing out

    val timerTask = new TimerTask() {
      def run() : Unit = {
            p.tryFailure(new TimeoutException())
        }
      }

    // Set the timeout to check in the future
    timer.schedule(timerTask, timeout.toMillis)

    future.map {
      a =>
        if(p.trySuccess(a)) {
          timerTask.cancel()
        }
    }
    .recover {
      case e: Exception =>
        if(p.tryFailure(e)) {
          timerTask.cancel()
        }
    }

    p.future
  }

}
于 2017-07-24T04:22:55.740 回答
23

我刚刚TimeoutFuture为同事创建了一个类:

超时未来

package model

import scala.concurrent._
import scala.concurrent.duration._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits._

object TimeoutFuture {
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {

    val prom = promise[A]

    // timeout logic
    Akka.system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future { 
      prom success block
    }

    prom.future
  } 
}

用法

val future = TimeoutFuture(10 seconds) { 
  // do stuff here
}

future onComplete {
  case Success(stuff) => // use "stuff"
  case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
}

笔记:

  • 假设播放!框架(但它很容易适应)
  • 每段代码都以相同的方式运行,ExecutionContext这可能并不理想。
于 2013-06-25T00:15:31.193 回答
6

Play 框架包含 Promise.timeout,因此您可以编写如下代码

private def get(): Future[Option[Boolean]] = {
  val timeoutFuture = Promise.timeout(None, Duration("1s"))
  val mayBeHaveData = Future{
    // do something
    Some(true)
  }

  // if timeout occurred then None will be result of method
  Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
}
于 2014-12-25T10:14:29.700 回答
5

我很惊讶这在 Scala 中不是标准的。我的版本很短,没有依赖关系

import scala.concurrent.Future

sealed class TimeoutException extends RuntimeException

object FutureTimeout {

  import scala.concurrent.ExecutionContext.Implicits.global

  implicit class FutureTimeoutLike[T](f: Future[T]) {
    def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
      Thread.sleep(ms)
      throw new TimeoutException
    }))

    lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
  }

}

使用示例

import FutureTimeout._
Future { /* do smth */ } withTimeout
于 2015-04-10T14:50:49.670 回答
5

如果您希望编写者(承诺持有者)成为控制超时逻辑的人,请使用akka.pattern.after,方式如下:

val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during...")))
Future.firstCompletedOf(Seq(promiseRef.future, timeout))

这样,如果你的 promise 完成逻辑永远不会发生,你的调用者的未来仍然会在某个失败的时候完成。

于 2017-02-26T12:17:30.747 回答
3

您可以在等待未来时指定超时时间:

对于scala.concurrent.Future,该result方法允许您指定超时。

对于scala.actors.Future,Futures.awaitAll让您指定超时。

我认为 Future 的执行没有内置超时。

于 2013-04-30T16:23:21.777 回答
3

还没有人提到akka-streams。流程有一个简单的completionTimeout方法,将其应用于单源流就像 Future 一样。

但是,akka-streams 也会取消,因此它实际上可以终止源运行,即它向源发出超时信号。

于 2016-09-21T13:40:45.353 回答
1

MonixTask有超时支持

import monix.execution.Scheduler.Implicits.global
import monix.eval._
import scala.concurrent.duration._
import scala.concurrent.TimeoutException

val source = Task("Hello!").delayExecution(10.seconds)

// Triggers error if the source does not complete in 3 seconds after runOnComplete
val timedOut = source.timeout(3.seconds)

timedOut.runOnComplete(r => println(r))
//=> Failure(TimeoutException)
于 2018-05-16T09:19:04.160 回答
1

此版本无需使用任何外部计时器即可工作(仅 Await.result)

import scala.concurrent._
import scala.concurrent.duration.FiniteDuration

object TimeoutFuture {
    def apply[A](
        timeout: FiniteDuration
    )(block: => A)(implicit executor: ExecutionContext): Future[A] =
        try {
            Future { Await.result(Future { block }, timeout) }
        } catch {
            case _: TimeoutException => Future.failed(new TimeoutException(s"Timed out after ${timeout.toString}"))
        }
}
于 2019-02-07T13:24:12.083 回答
0

在 Future IMO 上指定超时的最简单方法是 scala 的内置机制,如果 Future 花费的时间超过指定的超时时间,scala.concurrent.Await.ready则会抛出一个。TimeoutException否则,它将返回 Future 本身。这是一个简单的人为示例

import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.duration._
val f1: Future[Int] = Future {
  Thread.sleep(1100)
  5
}

val fDoesntTimeout: Future[Int] = Await.ready(f1, 2000 milliseconds)

val f: Future[Int] = Future {
  Thread.sleep(1100)
  5
}
val fTimesOut: Future[Int] = Await.ready(f, 100 milliseconds)
于 2019-04-25T12:30:20.150 回答
0
You can simply run the future to completion without giving any timeout interval by setting the timeout to infinite as below:

**import scala.concurrent.duration._  
Await.result(run(executionContext), Duration.Inf)**

run function can be as below :

def run(implicit ec: ExecutionContext) = {  
      val list = Seq(  
          Future { println("start 1"); Thread.sleep(1000); println("stop 1")},  
          Future { println("start 2"); Thread.sleep(2000); println("stop 2")},  
          Future { println("start 3"); Thread.sleep(3000); println("stop 3")},  
          Future { println("start 4"); Thread.sleep(4000); println("stop 4")},  
          Future { println("start 5"); Thread.sleep(5000); println("stop 5")}  
      )  
      Future.sequence(list)  
    }  
于 2020-04-13T12:57:46.443 回答
0

我正在使用这个版本(基于上面的 Play 示例),它使用 Akka 系统调度程序:

object TimeoutFuture {
  def apply[A](system: ActorSystem, timeout: FiniteDuration)(block: => A): Future[A] = {
    implicit val executionContext = system.dispatcher

    val prom = Promise[A]

    // timeout logic
    system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future {
      try {
        prom success block
      } catch {
        case t: Throwable => prom tryFailure t
      }
    }

    prom.future
  }
}
于 2018-01-19T15:03:36.460 回答
0

您可以使用Await.

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

val meaningOfLife: Int = Await.result(Future(42), 1.nano)
println (meaningOfLife)

以上印刷品42

在这种情况下,您可能需要一个隐式ExecutionContext可用,只需添加:

import scala.concurrent.ExecutionContext.Implicits.global

另一种方法是使用Coevalfrom monix。此方法并非在所有情况下都有效,您可以在此处阅读所有相关信息。基本思想是,有时未来实际上并不需要任何时间,而是返回同步函数调用或值的结果,因此可以在当前线程上评估这个未来。这对于测试和模拟期货也很有用。此外,您不必指定预期的超时,但不必担心这一点仍然很好。

您首先将未来转变为 aTask并将该任务包装在 a 中,Coeval然后在等待看到您得到什么时交叉手指。这是一个非常简单的例子来展示它是如何工作的:

您需要一个隐式Scheduler才能使用它:

import monix.execution.Scheduler.Implicits.global


Coeval(Task.fromFuture(Future (42)).runSyncStep).value() match {
   case Right(v) => println(v)
   case Left(task) => println("Task did not finish")
}

以上完成并打印42到控制台。

Coeval(Task.fromFuture(Future {
   scala.concurrent.blocking {
      42
   }
}).runSyncStep).value() match {
   case Right(v) => println(v)
   case Left(task) => println("Task did not finish")
}

此示例打印Task did not finish

于 2019-09-23T02:13:02.427 回答