1

如何处理异步执行任务期间的失败?即至少打印堆栈跟踪并关闭。下面的代码似乎永远等待输入> 5

val things = Range(1, 40)
  implicit val scheduler = monix.execution.Scheduler.global
  def t(i:Int) = Task.eval {
      Try{
        Thread.sleep(1000)
        val result = i + 1
        if(result > 5){
          throw new Exception("asdf")
        }
        // i.e. write to file, that's why unit is returned
        println(result) // Effect
        "Result"
      }
    }
    val futures = things.map(e=> t(e))
  futures.foreach(_.runToFuture)

编辑

试:

futures.foreach(_.runToFuture.onComplete {
    case Success(value) =>
      println(value)
    case Failure(ex) =>
      System.err.println(ex)
      System.exit(1)
  })

不会停止计算。如何记录堆栈跟踪并取消正在进行的计算并停止?

4

2 回答 2

1

一种更惯用的方法是使用Observable,而不是Task因为它正在处理数据列表(我假设这是用例,因为它显示在问题中)。

 val obs = Observable
  .fromIterable(Range(1, 40))
  .mapEval(i =>
    if (i + 1 > 5) Task.raiseError(new Exception("Error")) // will stop the stream
    else Task.delay(println(i)) // Or write to file in your case
  )
  .completedL
  .runToFuture


obs
  .recover {
    case NonFatal(e) => println("Error")
  }

或者,您也可以发出错误信号,Either从而提高类型安全性,因为您需要处理Either结果。

val obs = Observable
  .fromIterable(Range(1, 40))
  .mapEval(i =>
    if (i + 1 > 5) Task.pure(Left("Error"))
    else Task.delay(println(i)).map(_ => Right(())) // Or write to file in your case
  )
  .takeWhileInclusive(_.isRight) // will also emit the failing result
  .lastL
  .runToFuture


obs.map {
  case Left(err) => println("There's an error")
  case _ => println("Completed successfully")
}
于 2019-11-16T20:57:32.913 回答
0

这个问题有两个部分:

  • 使任务可取消。
  • 当一项任务失败时取消兄弟姐妹。

使任务可取消

Monix 具有BooleanCancelable,它允许您在isCancelled调用true时设置结果cancel

cancel运行时也需要调用Thread.interrupt唤醒它Thread.sleep。否则,sleep意志将自始至终。但是,这将在您的任务中引发InterruptedException 。这需要处理。

取消兄弟姐妹

CompositeCancelable。它似乎是从父任务CompositeCancellable调用的用例。cancel因此,一旦CompositeCancellable构建(即构建所有任务):

  • 必须为每个任务提供对此的引用,以便失败的任务可以调用cancel它。(注意这是一种循环引用,最好避免)
  • 或者,当子任务失败并调用cancel. (这将避免循环引用)

另一种通知同级任务的方法是使用AtomicBoolean并经常检查它(睡眠 10 毫秒而不是 1000 毫秒)。当一个任务失败时,它会设置这个布尔值,以便其他任务可以停止执行。这当然不涉及Cancellable。(这是一种 hack,最好使用 monix 调度程序)

笔记

Thread.sleep在 a 内调用是个好主意Task吗?我认为这会阻止另一个任务使用该线程。我认为使用调度程序添加延迟并组合这些子任务是最有效地利用线程池的方法。

于 2019-10-22T00:45:06.630 回答