57

Java Futurecancel方法,可以中断运行Future任务的线程。例如,如果我在 a 中包装一个可中断的阻塞调用,Java Future我可以稍后中断它。

Scala Future没有提供任何cancel方法。假设我将一个可中断的阻塞调用包装在Scala Future. 我怎么能打断它?

4

4 回答 4

33

这还不是Futures API 的一部分,但将来可能会作为扩展添加。

作为一种解决方法,您可以使用firstCompletedOf包装 2 个期货 - 您要取消的期货和来自自定义的期货Promise。然后,您可以通过使承诺失败来取消由此创建的未来:

def cancellable[T](f: Future[T])(customCode: => Unit): (() => Unit, Future[T]) = {
  val p = Promise[T]
  val first = Future firstCompletedOf Seq(p.future, f)
  val cancellation: () => Unit = {
    () =>
      first onFailure { case e => customCode}
      p failure new Exception
  }
  (cancellation, first)
}

现在你可以在任何未来调用它来获得一个“可取消的包装器”。示例用例:

val f = callReturningAFuture()
val (cancel, f1) = cancellable(f) {
  cancelTheCallReturningAFuture()
}

// somewhere else in code
if (condition) cancel() else println(Await.result(f1))

编辑:

有关取消的详细讨论,请参阅《学习 Scala 并发编程》一书的第 4 章。

于 2013-04-15T07:50:59.857 回答
10

我没有对此进行测试,但这扩展了 Pablo Francisco Pérez Hidalgo 的答案。我们没有阻塞等待 java Future,而是使用中间体Promise

import java.util.concurrent.{Callable, FutureTask}
import scala.concurrent.{ExecutionContext, Promise}
import scala.util.Try

class Cancellable[T](executionContext: ExecutionContext, todo: => T) {
  private val promise = Promise[T]()

  def future = promise.future

  private val jf: FutureTask[T] = new FutureTask[T](
    new Callable[T] {
      override def call(): T = todo
    }
  ) {
    override def done() = promise.complete(Try(get()))
  }

  def cancel(): Unit = jf.cancel(true)

  executionContext.execute(jf)
}

object Cancellable {
  def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] =
    new Cancellable[T](executionContext, todo)
}
于 2016-10-11T20:45:26.700 回答
8

通过取消我猜你想猛烈地打断future.

找到这段代码:https ://gist.github.com/viktorklang/5409467

做了一些测试,似乎工作正常!

享受 :)

于 2013-10-23T10:40:56.797 回答
3

我认为可以通过使用Java 7Future接口及其实现来降低实现的复杂性。

Cancellable可以构建一个 Java 未来,该未来将被其cancel方法取消。另一个future可以等待它的完成,从而成为可观察的接口,它本身的状态是不可变的:

 class Cancellable[T](executionContext: ExecutionContext, todo: => T) {

   private val jf: FutureTask[T] = new FutureTask[T](
     new Callable[T] {
       override def call(): T = todo
     }
   )

   executionContext.execute(jf)

   implicit val _: ExecutionContext = executionContext

   val future: Future[T] = Future {
     jf.get
   }

   def cancel(): Unit = jf.cancel(true)

 }

 object Cancellable {
   def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] =
     new Cancellable[T](executionContext, todo)
 }
于 2016-03-01T13:18:45.687 回答