0

我是 RxJava 的新手,如果有人能澄清这一点,那就太好了……

考虑以下流程:

  1. 每次订阅者请求一个值时生成/发出一个整数。
  2. 对生成的整数进行分组,以便每个值构成自己的组。注意:目标是模拟输入中存在大量组的情况。
  3. 现在对于每个组:
  • 将值收集到一系列 1 秒长的块中,并在第一个空块到达时停止。注意:在现实世界中,每组会有多个值,但这里每组恰好有 2 个缓冲区:1 个带有单个 int 的缓冲区和 1 个空缓冲区,这将有效地完成序列。
  • 将所有非空块(在本例中为 1)合并到一个列表中。
  • 将结果列表打印到标准输出中并再请求 1 个元素。

上述流程的原始实现:

import static java.util.stream.Collectors.toList;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import org.reactivestreams.Subscription;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public static void main(String[] args) {
final AtomicInteger ai = new AtomicInteger();
Flowable<Integer> flowable =
        Flowable.generate(emitter -> emitter.onNext(ai.getAndIncrement()));
flowable.groupBy(value -> value, value -> value, false, 1)
        .flatMap(grouped -> grouped.buffer(1, TimeUnit.SECONDS)
                .takeWhile(list -> !list.isEmpty())
                .reduce((list1, list2
                ) -> Stream.concat(list1.stream(), list2.stream()).collect(toList()))
                .toFlowable(), false, Integer.MAX_VALUE, 1)
        .subscribe(
                new FlowableSubscriber<>() {
                    private Subscription s;

                    @Override
                    public void onSubscribe(@NonNull Subscription s) {
                        this.s = s;
                        s.request(1);
                    }

                    @Override
                    public void onNext(List<Integer> groupedValues) {
                        assert groupedValues.size() == 1;
                        System.out.println(groupedValues);
                        s.request(1);
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println(
                                "Done.");
                    }
                }); }

如您所见,我只创建了 1 个自定义订阅者,它在创建订阅时请求 1 个项目,并且在每次处理列表时再请求 1 个。虽然flatMap'smaxConcurrency设置为Integer.MAX_VALUE(因为我想处理尽可能多的组) - 所有其他“提示”(例如 groupBy 和 flatMap's bufferSize)都设置为 1 并且没有其他缓冲(例如onBackpressureBuffer,“缓冲有限数量的来自当前的项目Flowable并允许它尽可能快地发射”)被请求。


所以问题是onNext's 的调用次数(因此,它请求一个值)远低于我传递给的 lambda 调用次数Flowable.generate- 大多数时候超过 40,000 次,并且我希望它至少与通过Subscrtiption.request. 正如我可以通过 lambda 的调用堆栈判断的那样,每次创建一个组时,它都会调用io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy.GroupBySubscriber#onSubscribe,然后调用org.reactivestreams.Subscription#request传递bufferSize(在上面的代码片段中配置为 1 )给它,所以这个过程几乎是自我延续的,所以说话。我不能指定bufferSize为 0,我也不能限制同时处理的组数:有限maxConcurrency值很快导致MissingBackpressureException...

我是否在这里遗漏了一些基本的东西,或者没有办法在这样的用例中真正应用背压?

提前致谢。

4

0 回答 0