0

我有一个我订阅了两次的 PublishSubject。第一个订阅者只计算处理的项目数,这个值总是与我通过观察者发送的值相匹配。但是,另一个订阅者正在使用缓冲区,而我经常 (75%) 没有收到通过观察者的所有项目。我是否使用错误的缓冲区?在我停止发送给观察者以确保所有项目都得到处理后,我等待的时间超过了时间跨度。

Integer downloads1 = 0;
Integer downloads2 = 0;
PublishSubject<Object> subject = PublishSubject.create();
// this subscriber count matches the expected
subject.subscribe(s -> {
  synchronized (downloads1) {
    downloads1 += 1;
  }
});
// this subscriber seems to miss items about 75% of the time
subject.buffer(100, TimeUnit.MILLISECONDS, 10).subscribe(list -> {
  synchronized (downloads2) {
    downloads2 += list.size();
  }
});
4

1 回答 1

0

也许你遇到了这个错误:https ://github.com/Netflix/RxJava/issues/534

顺便说一句,您应该使用reduce(R initialValue, Func2<R,? super T,R> accumulator)初始值为 0 而不是订阅,然后您不需要自己进行任何同步。

于 2014-04-17T10:31:16.013 回答