9

这段代码是线程安全的吗?

Observable<String> observable = ... // some observable that calls
                                    // onNext from a background thread

observable
  .scan(new ArrayList<String>(), (List<String> acc, String next) -> {
    acc.add(next);
    return acc;
  })
  .subscribe( list -> {
    // do somethind with sequence of lists
    ...
  });

我很好奇,因为 ArrayList 不是线程安全的数据结构。

4

2 回答 2

6

作为一个快速的答案,在 .NET(原始 Rx 实现)中,可以假设来自可观察序列的所有值都是连续的。这并不排除它是多线程的。但是,如果您以多线程方式生成值,那么您可能希望通过查找 .NET Synchronize()Rx 运算符的等效函数来强制执行顺序性质。

另一种选择是检查ScanRxJava 源代码中的实现,以验证它确实强制执行您希望/期望为您的累加器函数提供安全性的顺序性质。

于 2013-09-29T19:19:28.843 回答
4

如果这段代码不是线程安全的,那么要么 RxJava 被破坏,要么你的 Observable 源被破坏——操作符不可重入是 Rx 合约的一部分。

于 2013-09-30T05:49:41.833 回答