0

所以我对 Scala 和 RX 都很陌生。最了解并实际编写此代码的人刚刚离开,我不确定发生了什么。这个结构遍布他的代码,我不太清楚它在做什么:

def foo(List[Long]) : Observable[Unit] =
  Observable {
    subscriber => {
      do some stuff
      subscriber.onNext()
      subscriber.onCompleted()
    }

我主要得到do some stuff, 以及给订户的电话。我不明白的是,它subscriber来自哪里?是否subscriber => {实例化订阅者?是什么Observable { subscriber => { ... } }意思?

4

2 回答 2

1

如果您查看Observable 伴随对象文档,您将看到一个apply采用 type 函数的方法(Subscriber[T]) ⇒ Unit。因此,当您调用 时Observable{withSomeLambda},这与调用相同Observable.apply{withSomeLambda}

而且,如果您一直查看源代码,您会发现这确实在返回

toScalaObservable(rx.Observable.create(f))

f你传入的 lambda在哪里。

所以,subscriber只是 lambda 的参数。它由该函数的调用者传入。

于 2015-02-04T05:07:45.107 回答
1

此代码正在创建一个新的,如此Observable所述。

基本上,当下游组件订阅此流时,会调用此回调。在回调中,我们确定作为数据源的何时调用onNext(v: T)哪一个是我们将生成的每个元素传递给它们的方式,以及何时调用onCompleted()哪一个是我们告诉订阅者我们已完成发送数据的方式。

一旦你创建了一个,Observable你就可以开始调用Observable 操作符,这将导致另一个,复合Observable,或者会导致一个终止条件,这将结束该过程,并且通常会导致流的最终结果(通常是一个集合或聚合价值)。

您不会List在问题中使用 ,但通常如果您想从列表中创建一个反应流,您会调用Observable.from().

PS:我认为这是 RxJava 代码。

于 2015-02-04T05:08:10.220 回答