5

我想我不完全理解 scalaz Futures 是如何工作的。我正在尝试将一个项目从 scala futures 移植到 scalaz 实现,但问题是 scalaz Future 的性能较低。最简单的示例是使用 Spray 在身份验证请求中加载配置文件。

函数本身:

def loadProfile[A: CollectionProvider: JsonFormat](id: String) = future {
  remote.findOne[A]("id" :> id) match {
    case Some(profile) ⇒ \/-(profile)
    case None          ⇒ -\/(ProfileNotFoundRejection(id))
  }
}

scalaz 版本仅在一个符号上有所不同,我Future.applyscalaz.concurrent. 现在加载一些 html 页面的 Spray 路由:

get {
  path("profile" / "id" ~ Segment) { id ⇒
    onSuccess(loadProfile[User](id)) {
      case \/-(profile) ⇒ complete(html.page(profile))
      case -\/(pnfr)    ⇒ reject(pnfr)
    }
  }
}

loadProfile一样,scalaz 版本的不同之处仅在于方法调用:

get {
  path("profile" / "id" ~ Segment) { id ⇒
    ctx => loadProfile[User](id).runAsync {
      case \/-(profile) ⇒ ctx.complete(html.page(profile))
      case -\/(pnfr)    ⇒ ctx.reject(pnfr)
    }
  }
}

但是 scala Future 版本的请求在(大约)143ms完成,而 scalaz 版本在260ms完成。所以我不太关心这个特定的请求,而是一般的异步执行和服务的可扩展性,正如我在 scalaz Future 中理解的那样,我必须手动将执行分叉到一个单独的线程,所以它按顺序执行?scalaz 未来的用法有什么好的介绍/教程吗?

4

1 回答 1

1

scala 和 scalaz 期货非常不同:

斯卡拉

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits._

// creating two slow futures:
val f: Future[Unit] = Future { println("f " + Thread.currentThread().getName()); Thread.sleep(10000);  }
val g: Future[Unit] = Future { println("g " + Thread.currentThread().getName()); Thread.sleep(10000);  }

// and after moment asking for success
f onSuccess { case _ => println("f s1 " + Thread.currentThread().getName()) }
g onSuccess { case _ => println("g s1 " + Thread.currentThread().getName()) }
f onSuccess { case _ => println("f s2 " + Thread.currentThread().getName()) }
g onSuccess { case _ => println("g s2 " + Thread.currentThread().getName()) }

我们得到一个输出,在创建之后f立即g

f ForkJoinPool-1-worker-5
g ForkJoinPool-1-worker-3

约10秒后休息输出

f s1 ForkJoinPool-1-worker-5
g s1 ForkJoinPool-1-worker-5
f s2 ForkJoinPool-1-worker-5
g s2 ForkJoinPool-1-worker-5

斯卡拉兹

import scalaz.concurrent._ // z!
import scala.concurrent.ExecutionContext.Implicits._

// creating two slow futures:
val f: Future[Unit] = Future { println("f " + Thread.currentThread().getName()); Thread.sleep(10000);  }
val g: Future[Unit] = Future { println("g " + Thread.currentThread().getName()); Thread.sleep(10000);  }

创建fand之后g,什么也没有发生。我们有:

f: scalaz.concurrent.Future[Unit] = Async(<function1>)
g: scalaz.concurrent.Future[Unit] = Async(<function1>)

但是在运行它们之后,我们看到了不同之处:

f runAsync { _ => println("f s1 " + Thread.currentThread().getName()) }
g runAsync { _ => println("g s1 " + Thread.currentThread().getName()) }
f runAsync { _ => println("f s2 " + Thread.currentThread().getName()) }
g runAsync { _ => println("g s2 " + Thread.currentThread().getName()) }

我们得到结果:

f pool-4-thread-2
g pool-4-thread-1
f pool-4-thread-4
g pool-4-thread-3

f s2 pool-4-thread-4
g s2 pool-4-thread-3
g s1 pool-4-thread-1
f s1 pool-4-thread-2

有两点值得一提:

  • 期货fg再次执行。没有价值记忆。
  • 回调在与第runAsync一次计算相同的线程中执行。这是因为我们没有明确分叉。

很难说为什么它们在您的示例中表现不同。remove.findOne无论如何,大部分时间都应该花在上面。您想使用scala.concurrent.blockingaround 阻塞调用来帮助ExecutorService不遇到线程饥饿(在这两种情况下)。

于 2015-01-15T13:05:08.163 回答