0
   // 1 fixed thread

   implicit val waitingCtx = scala.concurrent.ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

    // "map" will use waitingCtx

    val ss = (1 to 1000).map {n => // if I change it to 10 000 program will be stopped at some point, like locking forever
      service1.doServiceStuff(s"service ${n}").map{s =>
        service1.doServiceStuff(s"service2 ${n}")
      }
    }

每个doServiceStuff(name:String)需要 5 秒。doServiceStuff 没有隐式 ex:Execution 上下文作为参数,它在内部使用自己的 ex 上下文并Future {blocking { .. }}对其执行。

最后程序打印:

took: 5.775849753 seconds for 1000 x 2 stuffs

如果我将 1000 更改为10000,添加更多任务:val ss = (1 to 10000)然后程序停止:

将打印约 17 027 行(共 20 000 行)。不会打印“错误”消息。不会打印“接受”消息

**并且不会再处理任何事情。

但是,如果我将 exContext 更改为ExecutionContext.fromExecutor(null: Executor)(global one),那么 in 会在大约 10 秒内结束(但通常不会)。

~17249 lines printed
ERROR: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
took: 10.646309398 seconds

这就是问题所在:为什么使用固定的前上下文池它会在没有消息传递的情况下停止,但对于全局前上下文它会终止但有错误和消息传递?

有时..它是不可重现的。

更新:我确实看到了"ERROR""took"如果我将池从 1 增加到 N。不管 N 有多高 - 它仍然会是错误。

代码在这里:https ://github.com/Sergey80/scala-samples/tree/master/src/main/scala/concurrency/apptmpl

在这里,doManagerStuff2()

4

1 回答 1

0

我想我知道发生了什么。如果你足够眯着眼睛,你会发现map职责非常轻:只需启动一个新的未来(因为 doServiceStuff 是一个未来)。我敢打赌,如果您切换到flatMap,行为将会改变,这实际上会使嵌套的未来变平,因此将等待第二次doServiceStuff调用完成。

由于您没有平展这些期货,因此您下游的所有等待都在等待错误的事情,而您没有抓住它,因为在这里您丢弃了任何服务返回。


更新

好的,我误解了你的问题,尽管我仍然认为嵌套的 Future 是一个错误。

当我尝试使用具有 10000 个任务的两个执行器的代码时,我确实OutOfMemoryForkJoin执行上下文中创建线程(即用于service任务)时得到了,这是我所期望的。您是否使用了任何特定的内存设置?

他们都成功完成了 1000 个任务。

于 2015-09-13T07:16:33.927 回答