我分解了这个问题并模拟了一个例子。通过将整个过程分成小块,我将其中的一些块放在一起形成一个解决方案。这可能不是最好的解决方案,但在发布更好的解决方案之前,以下 Groovy 代码可以工作。
// utility function to print values emitted by observable, exception notification, and observable completion
def oPrint (String name, Observable observable) {
observable
.subscribe (
{ value -> println "$name: $value" }
, { exception -> println "Exception ($name): ${exception.message}" }
, { -> println "$name: ended" }
)
}
// obtain list of reports to generate
def reports = Observable.range (1, 10)
.publish ()
// produce reports (some will take awhile)
def work = reports
.flatMap (
{ report ->
Observable.create { subscriber ->
if (report in [5, 6]) Thread.sleep 5_000L
subscriber.onNext report
subscriber.onCompleted ()
}
.subscribeOn (Schedulers.io ())
}
, { original, derived -> [value: original, status: 'success', result: derived] }
)
.publish ()
// all reports will be persisted for later viewing without redoing the work
def persist = work
.publish ()
// reports completed before a timeout will be passed through
def timed = work
.timeout (3, SECONDS)
.onErrorResumeNext (Observable.empty ())
.publish ()
// determine which reports did not complete prior to timeout
def delta = Observable.combineLatest (
reports.toList ()
, timed.map { result -> result.value }.toList ()
) { full, partial ->
Observable.from (full - partial)
}
.flatMap { observable -> observable }
.map { report -> [value: report, status: 'timeout'] }
.publish ()
// combine partially completed list with delta list with timeout status
def online = Observable.concat (
timed
, delta
)
.publish ()
// output final contents
oPrint "persist", persist
oPrint "online", online
// start all of the connectable observables
online.connect ()
delta.connect ()
timed.connect ()
persist.connect ()
work.connect ()
reports.connect ()
// wait until all observables have completed
Thread.sleep 10_000L
/*** start of output ***
online: [value:7, status:success, result:7]
persist: [value:7, status:success, result:7]
online: [value:1, status:success, result:1]
persist: [value:1, status:success, result:1]
online: [value:2, status:success, result:2]
persist: [value:2, status:success, result:2]
online: [value:3, status:success, result:3]
persist: [value:3, status:success, result:3]
online: [value:4, status:success, result:4]
persist: [value:4, status:success, result:4]
online: [value:8, status:success, result:8]
persist: [value:8, status:success, result:8]
online: [value:9, status:success, result:9]
persist: [value:9, status:success, result:9]
online: [value:10, status:success, result:10]
persist: [value:10, status:success, result:10]
online: [value:5, status:timeout]
online: [value:6, status:timeout]
online: ended
persist: [value:5, status:success, result:5]
persist: [value:6, status:success, result:6]
persist: ended
*** end of output ***/