1

我必须使用三个单独的 API 请求来获取三种类型的数据( AType, BType, )。CTypeAPI 返回的对象是一对多的:

  • 1 个AType对象是 N 个对象的父BType对象
  • 1 个BType对象是 P 个对象的父CType对象)

我正在使用以下三个函数来获取每种类型:

func get_A_objects() -> Observable<AType> { /* code here */ }
func get_B_objects(a_parentid:Int) -> Observable<BType> { /* code here */}
func get_C_objects(b_parentid:Int) -> Observable<CType> { /* code here */}

并且为了避免嵌套订阅,这三个函数使用链接flatMap

func getAll() -> Observable<CType> {
  return self.get_A_objects()
     .flatMap { (aa:AType) in  return get_B_objects(aa.id) }
     .flatMap { (bb:BType) in  return get_C_objects(bb.id) }
}

func setup() {
  self.getAll().subscribeNext { _ in
    print ("One more item fetched") 
  }
}

上面的代码工作正常,当有 M 个对象时AType,我可以看到"One more item fetched"打印 MxNxP 次的文本。

我想设置该功能getAll()以使用. 我最初的想法是写如下内容:ReplaySubject<String>

func getAll() -> ReplaySubject<String> {
  let msg = ReplaySubject<String>.createUnbounded()
  self.get_A_objects().doOnNext { aobj in msg.onNext ("Fetching A \(aobj)") }
    .flatMap { (aa:AType) in 
       return get_B_objects(aa.id).doOnNext { bobj in msg.onNext ("Fetching B \(bobj)") }
    }
    .flatMap { (bb:BType) in
       return get_C_objects(bb.id).doOnNext { cobj in msg.onNext ("Fetching C \(cobj)") }
    }

  return msg
}

但此尝试失败,即以下内容print()不打印任何内容。

getAll().subscribeNext {
  print ($0)
}

我应该如何重写我的逻辑?

4

1 回答 1

1

问题

这是因为你没有保留你Disposable的 s,所以它们被立即释放,因此什么也不做。

getAll中,您创建了一个Observable<AType>via get_A_objects(),但它并未添加到DisposeBag. 当它超出范围时(在 结束时func),它将被释放。所以{ aobj in msg.onNext ("Fetching A \(aobj)") }永远不会发生(或者至少不可能,如果它是异步的)。

此外,您也没有保留ReplaySubject<String>getAll().subscribeNext任何一个返回的。因此,出于同样的原因,这也将是一个交易破坏者。

解决方案

由于您需要两个Observables:一个用于实际最终结果(Observable<CType>),一个用于进度状态(ReplaySubject<String>),您应该从您的getAll()函数中返回两者,以便两者都可以“拥有”,并对其生命周期进行管理。

func getAll() -> (Observable<CType>, ReplaySubject<String>) {
    let progress = ReplaySubject<String>.createUnbounded()
    let results = self.get_A_objects()......
    return (results, progress)
}

let (results, progress) = getAll()

progress
    .subscribeNext {
        print ($0)
    }
    .addDisposableTo(disposeBag)

results
    .subscribeNext {
        print ($0)
    }
    .addDisposableTo(disposeBag)

一些注意事项:

  • 您不需要使用createUnbounded,如果您不小心,这可能会很危险。
  • 您可能根本不想使用ReplaySubject,因为如果有人在之后订阅并获得旧的进度状态消息,那么说您稍后“获取”某些东西是骗人的。考虑使用PublishSubject.
  • 如果您遵循上述建议,那么您只需要确保您progress之前订阅,results以确保您不会错过任何进度状态消息,因为输出将不再被缓冲。
  • 另外,这只是我的意见,但我会将“Fetching X Y”改成其他词,因为您不是“fetching”,但您已经“fetch ed ”了它。
于 2016-07-06T08:56:51.193 回答