我试图弄清楚如何使用 Observable.groupBy 来限制在一段时间内按键推送的元素数量。我最终得到以下构造:
create(emitter -> {
while (true) {
publishedMeter.mark();
emitter.onNext(new Object());
}
})
.window(1000L, TimeUnit.MILLISECONDS)
.flatMap(window -> window.groupBy(o -> o.hashCode() % 10_000).flatMapMaybe(Observable::lastElement))
.subscribe(e -> receivedMeter.mark());
虽然 subscribe 的 onNext 回调被调用了几千次,但我认为这应该意味着 flatMapMaybe 确实正确订阅了所有 GroupedObservableSource。过了一会儿,RxComputationThreadPool 中的一个线程,但我不明白我错过了什么