我是 RxJava 的新手,如果有人能澄清这一点,那就太好了……
考虑以下流程:
- 每次订阅者请求一个值时生成/发出一个整数。
- 对生成的整数进行分组,以便每个值构成自己的组。注意:目标是模拟输入中存在大量组的情况。
- 现在对于每个组:
- 将值收集到一系列 1 秒长的块中,并在第一个空块到达时停止。注意:在现实世界中,每组会有多个值,但这里每组恰好有 2 个缓冲区:1 个带有单个 int 的缓冲区和 1 个空缓冲区,这将有效地完成序列。
- 将所有非空块(在本例中为 1)合并到一个列表中。
- 将结果列表打印到标准输出中并再请求 1 个元素。
上述流程的原始实现:
import static java.util.stream.Collectors.toList;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import org.reactivestreams.Subscription;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public static void main(String[] args) {
final AtomicInteger ai = new AtomicInteger();
Flowable<Integer> flowable =
Flowable.generate(emitter -> emitter.onNext(ai.getAndIncrement()));
flowable.groupBy(value -> value, value -> value, false, 1)
.flatMap(grouped -> grouped.buffer(1, TimeUnit.SECONDS)
.takeWhile(list -> !list.isEmpty())
.reduce((list1, list2
) -> Stream.concat(list1.stream(), list2.stream()).collect(toList()))
.toFlowable(), false, Integer.MAX_VALUE, 1)
.subscribe(
new FlowableSubscriber<>() {
private Subscription s;
@Override
public void onSubscribe(@NonNull Subscription s) {
this.s = s;
s.request(1);
}
@Override
public void onNext(List<Integer> groupedValues) {
assert groupedValues.size() == 1;
System.out.println(groupedValues);
s.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println(
"Done.");
}
}); }
如您所见,我只创建了 1 个自定义订阅者,它在创建订阅时请求 1 个项目,并且在每次处理列表时再请求 1 个。虽然flatMap
'smaxConcurrency
设置为Integer.MAX_VALUE
(因为我想处理尽可能多的组) - 所有其他“提示”(例如 groupBy 和 flatMap's bufferSize
)都设置为 1 并且没有其他缓冲(例如onBackpressureBuffer
,“缓冲有限数量的来自当前的项目Flowable
并允许它尽可能快地发射”)被请求。
所以问题是onNext
's 的调用次数(因此,它请求一个值)远低于我传递给的 lambda 调用次数Flowable.generate
- 大多数时候超过 40,000 次,并且我希望它至少与通过Subscrtiption.request
. 正如我可以通过 lambda 的调用堆栈判断的那样,每次创建一个组时,它都会调用io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy.GroupBySubscriber#onSubscribe
,然后调用org.reactivestreams.Subscription#request
传递bufferSize
(在上面的代码片段中配置为 1 )给它,所以这个过程几乎是自我延续的,所以说话。我不能指定bufferSize
为 0,我也不能限制同时处理的组数:有限maxConcurrency
值很快导致MissingBackpressureException
...
我是否在这里遗漏了一些基本的东西,或者没有办法在这样的用例中真正应用背压?
提前致谢。