5

我试图理解语句背后的理性 对于绝对需要阻止的情况,可以阻止期货(尽管不鼓励)

背后的想法ForkJoinPool是加入阻塞操作的进程,这是期货和参与者的执行者上下文的主要实现。它应该对阻止连接有效。

我写了一个小基准,在这个非常简单的场景中,旧式期货(scala 2.9)似乎快了 2 倍。

@inline
  def futureResult[T](future: Future[T]) = Await.result(future, Duration.Inf)

  @inline
  def futureOld[T](body: => T)(implicit  ctx:ExecutionContext): () => T = {
    val f = future(body)
    () => futureResult(f)
  }

  def main(args: Array[String]) {
    @volatile

    var res = 0d
    CommonUtil.timer("res1") {
      (0 until 100000).foreach {  i =>
       val f1 = futureOld(math.exp(1))
        val f2 = futureOld(math.exp(2))
        val f3 = futureOld(math.exp(3))
        res = res + f1() + f2() + f3()
      }
    }
    println("res1 = "+res)
    res = 0

    res = 0
    CommonUtil.timer("res1") {
      (0 until 100000).foreach {  i =>
        val f1 = future(math.exp(1))
        val f2 = future(math.exp(2))
        val f3 = future(math.exp(3))
        val f4 = for(r1 <- f1; r2 <- f2 ; r3 <- f3) yield r1+r2+r3
        res = res + futureResult(f4)
      }
    }
    println("res2 = "+res)
  }



start:res1
res1 - 1.683 seconds
res1 = 3019287.4850644027
start:res1
res1 - 3.179 seconds
res2 = 3019287.485058338
4

2 回答 2

10

Futures 的大部分意义在于它们使您能够创建可以轻松并行执行的非阻塞并发代码。

好的,所以在将来包装一个可能很长的函数会立即返回,这样你就可以推迟担心返回值,直到你真正对它感兴趣。但是,如果确实关心值的代码部分只是阻塞直到结果实际可用,那么您所获得的只是一种使您的代码更整洁的方法(而且您知道,您可以在没有期货的情况下做到这一点 - 使用期货我认为整理你的代码将是一种代码气味)。除非包装在 future 中的函数绝对是微不足道的,否则您的代码将花费比评估其他表达式更多的时间阻塞。

另一方面,如果您注册了一个回调(例如,使用onCompleteonSuccess)并将关心结果的代码放入该回调中,那么您可以拥有可以组织起来非常有效地运行并很好地扩展的代码。它成为事件驱动的,而不是必须坐下来等待结果。

您的基准测试属于前一种类型,但由于您在那里有一些微小的功能,因此与按顺序执行它们相比,并行执行它们几乎没有什么好处。这意味着您主要评估创建和访问期货的开销。恭喜:你证明了在某些情况下,2.9 期货在做一些琐碎的事情上比 2.10 更快——一些琐碎的事情并没有真正发挥出任何一个版本的概念的优势。

尝试一些更复杂和要求更高的东西。我的意思是,您几乎立即请求未来值!至少,您可以构建一个包含 100000 个期货的数组,然后在另一个循环中提取它们的结果。那将是测试一些稍微有意义的东西。哦,让他们根据i的值计算一些东西。

你可以从那里进步到

  1. 创建一个对象来存储结果。
  2. 向每个将结果插入对象的未来注册回调。
  3. 启动您的n计算

然后对实际结果需要多长时间进行基准测试,当您要求全部时。那会更有意义。

编辑

顺便说一句,您的基准测试在其自身条件和对正确使用期货的理解上都失败了。

首先,您计算的是检索每个单独的未来结果所需的时间,但不是计算所有 3 个期货后评估res所需的实际时间,也不是迭代循环所需的总时间。此外,您的数学计算是如此微不足道,以至于您实际上可能正在测试 a) for理解和 b) 包含前三个期货的第四个未来的第二个测试中的惩罚。

其次,这些总和可能与所使用的总时间大致成比例的唯一原因正是因为这里实际上没有并发。

我不是要打你,只是基准测试中的这些缺陷有助于阐明问题。对不同期货实现的性能进行适当的基准测试需要非常仔细的考虑。

于 2013-09-10T09:31:05.963 回答
6

ForkJoinTask报告的 Java7 文档:

ForkJoinTask 是 Future 的轻量级形式。ForkJoinTasks 的效率源于一组限制(仅部分静态可执行),反映了它们作为计算任务计算纯函数或在纯孤立对象上操作的预期用途。主要的协调机制是 fork(),它安排异步执行,以及 join(),直到计算出任务的结果才继续。计算应避免同步方法或块,并且应尽量减少其他阻塞同步,除了加入其他任务或使用同步器(如被宣传为与 fork/join 调度合作的 Phaser)。任务也不应该执行阻塞 IO,并且理想情况下应该访问完全独立于其他正在运行的任务访问的变量。轻微违反这些限制,例如使用共享输出流,在实践中可能是可以容忍的,但频繁使用可能会导致性能下降,并且如果未等待 IO 或其他外部同步的线程数耗尽,则可能会无限期停止。此使用限制部分是通过不允许抛出检查异常(例如 IOExceptions)来强制执行的。但是,计算可能仍然会遇到未经检查的异常,这些异常会被重新抛出给试图加入它们的调用者。这些异常可能还包括源自内部资源耗尽的 RejectedExecutionException,例如未能分配内部任务队列。重新抛出的异常与常规异常的行为方式相同,但在可能的情况下,包含启动计算的线程和实际遇到异常的线程的堆栈跟踪(例如使用 ex.printStackTrace() 显示的);至少只有后者。

Doug Lea 的JSR166维护存储库(针对 JDK8)对此进行了扩展:

ForkJoinTask 是 Future 的轻量级形式。ForkJoinTasks 的效率源于一组限制(仅部分静态可执行),反映了它们作为计算任务的主要用途,计算纯函数或对纯孤立对象进行操作。主要的协调机制是 fork(),它安排异步执行,以及 join(),直到计算出任务的结果才继续。理想情况下,计算应避免同步方法或块,并应尽量减少其他阻塞同步,除了加入其他任务或使用同步器(如被宣传为与 fork/join 调度合作的 Phaser)。可细分任务也不应该执行阻塞 I/O,并且理想情况下应该访问完全独立于其他正在运行的任务访问的变量。这些准则通过不允许抛出诸如 IOExceptions 之类的检查异常来松散地执行。但是,计算可能仍然会遇到未经检查的异常,这些异常会被重新抛出给试图加入它们的调用者。这些异常可能还包括源自内部资源耗尽的 RejectedExecutionException,例如未能分配内部任务队列。重新抛出的异常与常规异常的行为方式相同,但在可能的情况下,包含启动计算的线程和实际遇到异常的线程的堆栈跟踪(例如使用 ex.printStackTrace() 显示的);至少只有后者。这些准则通过不允许抛出诸如 IOExceptions 之类的检查异常来松散地执行。但是,计算可能仍然会遇到未经检查的异常,这些异常会被重新抛出给试图加入它们的调用者。这些异常可能还包括源自内部资源耗尽的 RejectedExecutionException,例如未能分配内部任务队列。重新抛出的异常与常规异常的行为方式相同,但在可能的情况下,包含启动计算的线程和实际遇到异常的线程的堆栈跟踪(例如使用 ex.printStackTrace() 显示的);至少只有后者。这些准则通过不允许抛出诸如 IOExceptions 之类的检查异常来松散地执行。但是,计算可能仍然会遇到未经检查的异常,这些异常会被重新抛出给试图加入它们的调用者。这些异常可能还包括源自内部资源耗尽的 RejectedExecutionException,例如未能分配内部任务队列。重新抛出的异常与常规异常的行为方式相同,但在可能的情况下,包含启动计算的线程和实际遇到异常的线程的堆栈跟踪(例如使用 ex.printStackTrace() 显示的);至少只有后者。计算可能仍然会遇到未经检查的异常,这些异常会被重新抛出给试图加入它们的调用者。这些异常可能还包括源自内部资源耗尽的 RejectedExecutionException,例如未能分配内部任务队列。重新抛出的异常与常规异常的行为方式相同,但在可能的情况下,包含启动计算的线程和实际遇到异常的线程的堆栈跟踪(例如使用 ex.printStackTrace() 显示的);至少只有后者。计算可能仍然会遇到未经检查的异常,这些异常会被重新抛出给试图加入它们的调用者。这些异常可能还包括源自内部资源耗尽的 RejectedExecutionException,例如未能分配内部任务队列。重新抛出的异常与常规异常的行为方式相同,但在可能的情况下,包含启动计算的线程和实际遇到异常的线程的堆栈跟踪(例如使用 ex.printStackTrace() 显示的);至少只有后者。重新抛出的异常与常规异常的行为方式相同,但在可能的情况下,包含启动计算的线程和实际遇到异常的线程的堆栈跟踪(例如使用 ex.printStackTrace() 显示的);至少只有后者。重新抛出的异常与常规异常的行为方式相同,但在可能的情况下,包含启动计算的线程和实际遇到异常的线程的堆栈跟踪(例如使用 ex.printStackTrace() 显示的);至少只有后者。

可以定义和使用可能阻塞的 ForkJoinTasks,但这样做需要进一步考虑三个方面: (1) 完成少数任务(如果有的话)应该依赖于阻塞外部同步或 I/O 的任务。从未加入的事件式异步任务(例如,那些继承 CountedCompleter 的任务)通常属于这一类。(2)为了尽量减少资源影响,任务应该很小;理想情况下只执行(可能)阻塞操作。(3) 除非使用 ForkJoinPool.ManagedBlocker API,或者已知可能阻塞的任务数小于池的 ForkJoinPool.getParallelism() 级别,否则池不能保证有足够的线程可用以确保进度或良好的性能.

tl;博士;

不要将 fork-join 所指的“阻塞连接”操作与调用任务中的某些“阻塞代码”相混淆。

第一个是关于协调许多独立任务(不是独立线程)以收集单个结果并评估整体结果。

第二个是关于在单个任务中调用潜在的长阻塞操作:例如网络上的 IO 操作、数据库查询、访问文件系统、访问全局同步的对象或方法......

FuturesScala和ForkJoinTasks两者都不鼓励第二种阻塞。主要风险是线程池耗尽并且无法完成队列中等待的任务,而所有可用线程都忙于等待阻塞操作。

于 2013-09-10T15:25:38.393 回答