1

我想从这样的sObservable数组中创建一个:Observable

package rxtest

import concurrent._
import concurrent.ExecutionContext.Implicits.global

import rx.lang.scala._
import rx.lang.scala.JavaConversions._
import rx.lang.scala.schedulers._

object A extends App {
    val ps = Array.fill(3)(Promise[Int]())
    val os = ps map {
            p => Observable from p.future observeOn NewThreadScheduler()
        }
    val v = rx.Observable.merge(os map toJavaObservable)
}

该程序无法编译,因为Observable有几个重载方法都被调用merge

[error] /home/xgp/work/rxtest/src/main/scala/rxtest/A.scala:15: overloaded method value merge with alternatives:
[error]   [T](x$1: Array[rx.Observable[_ <: T]])rx.Observable[T] <and>
[error]   [T](x$1: rx.Observable[_ <: rx.Observable[_ <: T]])rx.Observable[T] <and>
[error]   [T](x$1: Iterable[_ <: rx.Observable[_ <: T]])rx.Observable[T]
[error]  cannot be applied to (Array[rx.Observable[_ <: Int]])
[error]     val v = rx.Observable.merge(os map toJavaObservable)
[error]                           ^
[error] one error found

然后我想在另一个 Java 类的帮助下消除重载:

public class RxUtils {
    public final static <T> Observable<T> merge(Observable<? extends T>[] os) {
        return Observable.merge(os);
    }
}

Scala 代码变为(此处仅列出相关部分):

val ps = Array.fill(3)(Promise[Int]())
val os = ps map {
        p => Observable from p.future observeOn NewThreadScheduler()
    }
val v = RxUtils.merge(os map toJavaObservable)

该程序仍然无法编译:

[error] /home/xgp/work/rxtest/src/main/scala/rxtest/A.scala:17: no type parameters for method merge: (os: Array[rx.Observable[_ <: T]])rx.Observable[T] exist so that it can be applied to arguments (Array[rx.Observable[_ <: Int]])
[error]  --- because ---
[error] argument expression's type is not compatible with formal parameter type;
[error]  found   : Array[rx.Observable[_ <: Int]]
[error]  required: Array[rx.Observable[_ <: ?T]]
[error]     val v = RxUtils.merge(os map toJavaObservable)
[error]                     ^
[error] /home/xgp/work/rxtest/src/main/scala/rxtest/A.scala:17: type mismatch;
[error]  found   : Array[rx.Observable[_ <: Int]]
[error]  required: Array[rx.Observable[_ <: T]]
[error]     val v = RxUtils.merge(os map toJavaObservable)
[error]                              ^
[error] two errors found

我有三个问题:

  1. 如何merge像第一种情况一样用纯 Scala 调用方法?
  2. 为什么第二个程序无法编译?
  3. 如何在Scala中调用merge上述类中的方法?RxUtils
4

1 回答 1

2

我真的很困惑你在这里做什么。你为什么混合rx.Observablerx.lang.scala.Observable。只需选择其中之一:如果您在 Scala 中工作,请选择后者;如果您正在编写 Java 代码,请选择前者!

我还想向您指出这个页面,它比较了这两种Observable.

关于你的第一个程序,如果我是正确的类型psArray[Promise[Int]],那么os必须有类型Array[Observable[Int]]。如果您想将它们全部合并为一个Observable,您可以点击上面的链接并merge(Array<Observable<? extends T>>)在左栏中搜索。事实证明,您可以在 Scala 中将其编写为Observable.from(os).flattenos.toObservable.flatten.

关于第二个和第三个问题:我还没有真正检查过,但这可能与 Java 和 Scala 之间的协方差差异有关。可能帮助类型系统提供一些额外的类型信息就可以了。但我想,如果你只是停留在 Scala 语言中并按原样使用 RxScala 库,那么你根本不必处理这样的问题。

于 2016-04-14T21:59:55.343 回答