我是响应式编程领域的新手,我正在尝试使用 rxjava 2 创建一个简单的背压感知消息处理。
以下是我试图实现的工作流程:
Flowable 的连续字符串流。
执行耗时操作并将消息更改为另一个字符串
执行另一个耗时的操作。
现在我正在使用以下代码:
{
Flowable.create(subscriber -> {
some_stream.forEach(data -> {
subscriber.onNext(data);
});
}, BackpressureStrategy.BUFFER).
subscribeOn(Schedulers.io()). // Data emission will run io scheduler
observeOn(Schedulers.computation()). // Map operation will run on computation scheduler
map(val -> Time_Consuming_Task(val)). // Task returns another string
observeOn(Schedulers.io()). / Next consumer will run on computation scheduler
subscribe(val -> Another_Time_Consuming_Task(val));
}
现在对于小型操作,我没有看到任何与背压相关的问题。
但是对于大型流,我不知道它会如何表现。
现在我的问题是:-
在BackpressureStrategy.BUFFER的情况下,默认缓冲区大小是多少?数据在哪里被缓冲?
如果我想在每个耗时的任务之前创建两个背压缓冲区,我应该使用onBackpressureBuffer 运算符吗?
如果缓冲区已满,我不想丢失数据,我想等待或在那种情况下?