我有一个我订阅了两次的 PublishSubject。第一个订阅者只计算处理的项目数,这个值总是与我通过观察者发送的值相匹配。但是,另一个订阅者正在使用缓冲区,而我经常 (75%) 没有收到通过观察者的所有项目。我是否使用错误的缓冲区?在我停止发送给观察者以确保所有项目都得到处理后,我等待的时间超过了时间跨度。
Integer downloads1 = 0;
Integer downloads2 = 0;
PublishSubject<Object> subject = PublishSubject.create();
// this subscriber count matches the expected
subject.subscribe(s -> {
synchronized (downloads1) {
downloads1 += 1;
}
});
// this subscriber seems to miss items about 75% of the time
subject.buffer(100, TimeUnit.MILLISECONDS, 10).subscribe(list -> {
synchronized (downloads2) {
downloads2 += list.size();
}
});