1

我很难找到一个优雅的解决方案来链接一些期货。我尝试实现的方法如下所示(它是缓存的一部分):

def acquire(key: A, producer: => Future[B]): Future[B]

算法大致是这样的:

  • 如果密钥被锁定,立即通过运行时异常(是否会提前使用 Future.failed?)
  • 否则打开一个future { getOrRefresh }块,因为它需要一些时间来检索密钥
  • getOrRefresh要么返回一个顺子B,然后是未来和方法的结果
  • 或者它需要运行producer

最后一步意味着我需要从 future内部对未来进行“扁平化” 。也就是说,我不能做 a outer.flatMap,所以我猜策略是使用Await.

现在,Await有一个奇怪的精神分裂症,你可以得到一个Option[Try[B]], using 方法ready,或者 unwound B, using result。问题是,我需要Failure在完成外部未来之前释放一个锁,所以我必须坚持Await.ready,我猜。

这变得丑陋:

val fut   = producer
val prod  = Await.ready(fut, Duration.Inf).value.get
if (prod.isFailure) sync.synchronized { locked = false }
prod.get

能有这么丑吗?必须有更好的方法来做到这一点。


所以重复一遍:在Future[B]运行中,一些同样以 完成的对等未来B,并返回对等的结果,但在失败的情况下,在完成主要未来之前清理锁。

4

2 回答 2

0

我对您的实施进行了一些更改。

首先,我已将您的首字母throw锁定为Future.failed,因为期货的消费者应该可以安全地假设唯一需要注意的失败是失败的未来。

接下来,我没有调用from ,而是get将它作为未来的结果返回。然后我得到结果,以便我可以替换在(map 将导致 Future[Future[B]])的情况下产生的未来。在 的情况下,我从值返回 a ,因为需要返回未来。Option[B]readEntryflatMapproducerNoneSomeFuture.successfulflatMap

最后,我用 an替换了recoverand因为我们希望失败的传播并且只想将副作用链接到未来以解锁条目。throwandThenFuture

trait Cache[A, B] {
  class Entry(var locked: Boolean = true)

  private var map  = Map.empty[A, Entry]
  private val sync = new AnyRef

  def readEntry(key: A): Option[B] = ???

  def acquire(key: A, producer: => Future[B]): Future[B] = sync.synchronized {
    map.get(key) match {
      case Some(e) =>
        if (e.locked)
          Future.failed(new IllegalStateException())
        else {
        e.locked = true
        future { readEntry(key)}.flatMap {
          case None => producer.andThen {
            case Failure(_) => sync.synchronized(e.locked = false)
          }
          case Some(value) => Future.successful(value)
        }
      }

      case _ => producer.map { value =>
        sync.synchronized {
          map += key -> new Entry()
        }
        value
      }
    }
  }
}
于 2015-04-23T20:55:17.930 回答
0

这里尝试使用recover(With)并因此避免Await. 不过,它看起来很笨重,因为我需要重新抛出异常

import concurrent._

trait Cache[A, B] {
  class Entry(var locked: Boolean = true)

  private var map  = Map.empty[A, Entry]
  private val sync = new AnyRef

  implicit def exec: ExecutionContext

  def readEntry(key: A): Option[B]

  def acquire(key: A, producer: => Future[B]): Future[B] = sync.synchronized {
    map.get(key) match {
      case Some(e) =>
        if (e.locked) throw new IllegalStateException()
        e.locked     = true
        val existing = future { readEntry(key).get }
        val refresh  = existing.recoverWith {
          case _: NoSuchElementException => producer
        }
        refresh.recover {
          case t => sync.synchronized(e.locked = false); throw t
        }

      case _ => producer.map { value =>
        sync.synchronized {
          map += key -> new Entry()
        }
        value
      }
    }
  }
}

如果您有建议,请将它们作为单独的答案发布。

于 2013-04-02T23:09:49.080 回答