8

有一些很好的提示如何将期货与超时结合起来。但是我很好奇如何使用Future 序列 sequenceOfFutures

我的第一种方法看起来像这样

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits._

object FutureSequenceScala extends App {
  println("Creating futureList")

  val timeout = 2 seconds
  val futures = List(1000, 1500, 1200, 800, 2000) map { ms =>
    val f = future {
      Thread sleep ms
      ms toString
    }
    Future firstCompletedOf Seq(f, fallback(timeout))
  }

  println("Creating waitinglist")
  val waitingList = Future sequence futures
  println("Created")

  val results = Await result (waitingList, timeout * futures.size)
  println(results)

  def fallback(timeout: Duration) = future {
    Thread sleep (timeout toMillis)
    "-1"
  }
}

有没有更好的方法来处理一系列期货中的超时,或者这是一个有效的解决方案?

4

3 回答 3

8

您可能需要重新考虑您的代码中的一些内容。对于初学者来说,我不太喜欢将任务提交到ExecutionContext唯一目的是模拟超时并Thread.sleep在其中使用过的任务。调用是阻塞的sleep,您可能希望避免在执行上下文中有一个纯粹阻塞的任务,以便等待固定的时间。我将在这里窃取我的答案,并建议对于纯粹的超时处理,您应该使用我在该答案中概述的内容。这HashedWheelTimer是一个高效的计时器实现,它比只是休眠的任务更适合超时处理。

现在,如果你走那条路,我建议的下一个更改涉及处理每个未来的单个超时相关故障。如果您希望单个失败Future使调用返回的聚合完全失败sequence,那么什么都不做。如果您不希望这种情况发生,而是希望超时返回一些默认值,那么您可以使用这样recover的:Future

withTimeout(someFuture).recover{
  case ex:TimeoutException => someDefaultValue
}

完成后,您可以利用非阻塞回调并执行以下操作:

waitingList onComplete{
  case Success(results) => //handle success
  case Failure(ex) => //handle fail
}

每个未来都有一个超时,因此不会无限运行。没有必要 IMO 在那里阻止并通过atMost参数提供额外的超时处理层 to Await.result。但我想这假设你对非阻塞方法没问题。如果你真的需要阻止那里,那么你不应该等待timeout * futures.size大量的时间。这些期货并行运行;那里的超时时间应该只需要与期货本身的各个超时时间一样长(或者稍微长一点,以解决 cpu/计时的任何延迟)。它当然不应该是超时 * 期货总数。

于 2013-07-16T12:29:36.983 回答
1

这是一个显示您的阻塞有多糟糕的版本fallback

请注意,执行程序是单线程的,并且您正在创建许多后备。

@cmbaxter 是对的,您的主超时不应该是timeout * futures.size,它应该更大!

@cmbaxter 也是正确的,您想考虑非阻塞。一旦你这样做了,并且你想施加超时,那么你将为此选择一个计时器组件,查看他的链接答案(也从你的链接答案链接)。

也就是说,我仍然喜欢您的链接中的回答,因为坐在循环中等待下一个应该超时的事情真的很简单。

它只需要一个期货列表及其超时和一个备用值。

也许有一个用例,例如一个简单的应用程序,它只是阻止某些结果(如您的测试)并且在结果出现之前不得退出。

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

import java.util.concurrent.Executors
import java.lang.System.{ nanoTime => now }

object Test extends App { 
  //implicit val xc = ExecutionContext.global
  implicit val xc = ExecutionContext fromExecutorService (Executors.newSingleThreadExecutor)

  def timed[A](body: =>A): A = {
    val start = now 
    val res = body
    val end = now
    Console println (Duration fromNanos end-start).toMillis + " " + res
    res
  }
  println("Creating futureList")

  val timeout = 1500 millis
  val futures = List(1000, 1500, 1200, 800, 2000) map { ms =>
    val f = future {
      timed {
        blocking(Thread sleep ms)
        ms toString
      }
    } 
    Future firstCompletedOf Seq(f, fallback(timeout))
  }   

  println("Creating waitinglist")
  val waitingList = Future sequence futures
  println("Created")

  timed {
  val results = Await result (waitingList, 2 * timeout * futures.size)
  println(results)
  }     
  xc.shutdown

  def fallback(timeout: Duration) = future {
    timed {
      blocking(Thread sleep (timeout toMillis))
      "-1"
    }
  }   
}   

发生了什么:

Creating futureList
Creating waitinglist
Created
1001 1000
1500 -1
1500 1500
1500 -1
1200 1200
1500 -1
800 800
1500 -1
2000 2000
1500 -1
List(1000, 1500, 1200, 800, 2000)
14007 ()
于 2013-07-16T16:07:50.583 回答
0

Monix Task 有超时支持:

  import monix.execution.Scheduler.Implicits.global
  import monix.eval._
  import scala.concurrent.duration._

  println("Creating futureList")
  val tasks = List(1000, 1500, 1200, 800, 2000).map{ ms =>
    Task {
      Thread.sleep(ms)
      ms.toString
    }.timeoutTo(2.seconds, Task.now("-1"))
  }

  println("Creating waitinglist")
  val waitingList = Task.gather(tasks) // Task.sequence is true/literally "sequencing" operation

  println("Created")
  val results = Await.result(waitingList, timeout * futures.size)
  println(results)
于 2018-05-16T09:09:42.590 回答