1

我正在为 Futures 编写 scala <-> java interop 包装器,但我不知道实现 scala.concurrent.Future.onComplete 的正确方法(http://www.scala-lang.org/api/current/index.html)。 html#scala.concurrent.Future)。这可能有效:

def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
  executor.execute(new Runnable {
    @tailrec
    def run = value match {
      case Some(t) => func(t)
      case None => { Thread.sleep(100); run }
    }
  })
}

但是带有期货的 Scala 中的异步 IO表明,当我必须阻止时,我应该将代码的相关部分传递给 scala.concurrent.blocking 以让 ExecutionContext 知道发生了什么。问题是,当我用阻塞 {} 包围值 match{...} 时,它不再是尾调用。

众所周知的正确方法是什么?

编辑:为了完整起见,这里是整个包装类:

class JavaFutureWrapper[T](val jf: java.util.concurrent.Future[T]) extends scala.concurrent.Future[T] {
  def isCompleted = jf.isDone

  def result(atMost: Duration)(implicit permit: CanAwait): T =
    atMost match { case Duration(timeout, units) => jf.get(timeout, units) }

  def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
    executor.execute(new Runnable {
      @tailrec
      def run = value match {
        case Some(t) => func(t)
        case None => { Thread.sleep(100); run }
      }
    })
  }

  def ready(atMost: Duration)(implicit permit: CanAwait): this.type = atMost match {
    case Duration(timeout, units) => {
      jf.get(timeout, units)
      this
    }
  }

  def value: Option[Try[T]] = (jf.isCancelled, jf.isDone) match {
    case (true, _) => Some(Failure(new Exception("Execution was cancelled!")))
    case (_, true) => Some(Success(jf.get))
    case _ => None
  }
}
4

2 回答 2

2

我只是等待 Java 的未来完成:

import scala.util.{Try, Success, Failure}
import scala.concurrent._
import java.util.concurrent.TimeUnit

class JavaFutureWrapper[T](val jf: java.util.concurrent.Future[T])
  extends scala.concurrent.Future[T] {
  ...

  def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit =
    executor.execute(new Runnable {
      def run: Unit = {
        val result = Try(blocking(jf.get(Long.MaxValue, TimeUnit.MILLISECONDS)))
        func(result)
      }
    })
  ...
}
于 2013-08-22T15:32:49.710 回答
0

嗯,我对 0__ 答案的编辑没有得到批准,所以为了未来的读者,这是我要使用的解决方案(从 0__ 简化)

def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
  executor.execute(new Runnable {
    def run = func(Try( blocking { jf.get } ))
  })
}
于 2013-08-22T16:19:28.193 回答