0

如果我创建了一个包含十个项目的 Observable,其中三个项目超过了超时阈值,那么我怎样才能产生一个最终结果,其中包含所有已完成的项目和未完成的项目分区以形成完整的报告。

以下 Groovy 代码通过应用 15 秒的不活动超时来工作,但仅包含已完成工作的数据,使用户没有任何有关丢失项目状态的信息。

getListOfReports ()
.flatMap { report -> getReport_Async (report) }
.timeout (15, SECONDS)
.onErrorResumeNext (Observable.empty ())

函数 getListOfReports 返回一个 Observable(同步 - 来自数据库)。函数 getReport_Async 返回一个 Observable(来自多个数据库的异步 sql)。

我怀疑将涉及一个主题,它将持有第二个订阅,并且会在超时事件之后以某种方式产生差异操作。在不采取涉及副作用的极端措施的情况下,我正在努力解决这个问题。

4

1 回答 1

0

我分解了这个问题并模拟了一个例子。通过将整个过程分成小块,我将其中的一些块放在一起形成一个解决方案。这可能不是最好的解决方案,但在发布更好的解决方案之前,以下 Groovy 代码可以工作。

// utility function to print values emitted by observable, exception notification, and observable completion
def oPrint (String name, Observable observable) {
  observable
  .subscribe (
    { value -> println "$name: $value" }
  , { exception -> println "Exception ($name): ${exception.message}" }
  , { -> println "$name: ended" }
  )
}

// obtain list of reports to generate
def reports = Observable.range (1, 10)
.publish ()

// produce reports (some will take awhile)
def work = reports
.flatMap (
  { report -> 
    Observable.create { subscriber ->
      if (report in [5, 6]) Thread.sleep 5_000L
      subscriber.onNext report
      subscriber.onCompleted ()
    }
    .subscribeOn (Schedulers.io ())
  }
, { original, derived -> [value: original, status: 'success', result: derived] }
)
.publish ()

// all reports will be persisted for later viewing without redoing the work 
def persist = work
.publish ()

// reports completed before a timeout will be passed through
def timed = work
.timeout (3, SECONDS)
.onErrorResumeNext (Observable.empty ())
.publish ()

// determine which reports did not complete prior to timeout
def delta = Observable.combineLatest (
  reports.toList ()
, timed.map { result -> result.value }.toList ()
) { full, partial ->
  Observable.from (full - partial)
}
.flatMap { observable -> observable }
.map { report -> [value: report, status: 'timeout'] }
.publish ()

// combine partially completed list with delta list with timeout status 
def online = Observable.concat (
  timed
, delta
)
.publish ()

// output final contents 
oPrint "persist", persist
oPrint "online", online

// start all of the connectable observables 
online.connect ()
delta.connect ()
timed.connect ()
persist.connect ()
work.connect ()
reports.connect ()

// wait until all observables have completed 
Thread.sleep 10_000L

/*** start of output ***

online: [value:7, status:success, result:7]
persist: [value:7, status:success, result:7]
online: [value:1, status:success, result:1]
persist: [value:1, status:success, result:1]
online: [value:2, status:success, result:2]
persist: [value:2, status:success, result:2]
online: [value:3, status:success, result:3]
persist: [value:3, status:success, result:3]
online: [value:4, status:success, result:4]
persist: [value:4, status:success, result:4]
online: [value:8, status:success, result:8]
persist: [value:8, status:success, result:8]
online: [value:9, status:success, result:9]
persist: [value:9, status:success, result:9]
online: [value:10, status:success, result:10]
persist: [value:10, status:success, result:10]
online: [value:5, status:timeout]
online: [value:6, status:timeout]
online: ended
persist: [value:5, status:success, result:5]
persist: [value:6, status:success, result:6]
persist: ended

*** end of output ***/
于 2016-02-02T03:36:29.927 回答