我想将外部流程执行表示为Observable[String]
, where String
- 来自流程输出的行。这是我所做的示例,它有效:
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
object TestSo {
def main(args: Array[String]): Unit = {
val lineStream = scala.sys.process.Process("python3 test.py").lineStream
val lineStreamO: Observable[String] = Observable.fromIterator(Task(lineStream.iterator))
.doOnNext(l => Task(println(l))) //logging
.guarantee(Task(println("clean resources")))
println(lineStreamO.toListL.runSyncUnsafe())
}
}
您可以看到,该过程每秒都会发出新行。但这无关紧要。只需提供完整的示例,test.py
:
from time import sleep
print(0, flush=True)
sleep(1)
print(1, flush=True)
sleep(1)
print(2, flush=True)
sleep(1)
print(3, flush=True)
sleep(1)
print(4, flush=True)
输出:
0
1
2
3
4
5
clean resources
List(0, 1, 2, 3, 4, 5)
问题:
我想要超时 - 如果进程冻结(例如sleep 100000
)进程应该在超时后被杀死。此外,如果进程强制或失败,则应清理一些资源(guarantee
例如)。非零退出代码应该代表失败。
如何像Observable[String]
正确的错误处理一样实现流程执行?rx-java
欢迎解决方案。