我有一些像这样Flowable
组成的FlowableTransformer
:
public static FlowableTransformer<I, O> transformer() {
return f -> {
return Flowable.mergeArray(5, 5, f.filter(...).map(...), f.filter(...).map(...));
}
}
这里的基本用例是具有某种形式的分支逻辑- 如果元素是 A ( filter
) 则执行 B ( map
) 或者如果它们是 C 则执行 D。
但是,在下游,当我存储Disposable
and 稍后调用cancel
它时,取消只会在流中冒泡,直到Flowable
返回mergeArray
. 其上游,即呼叫上游的运营商compose(transformer())
,从未收到取消。
所以我认为这本身就是一个问题Flowable.merge()
(或者更确切地说:我缺乏理解),所以我改用 areplay()
和 a ConnectableFlowable
:
public static FlowableTransformer<I, O> transformer() {
return f -> {
ConnectableFlowable<TaggedUpdates> cf = taggedUpdateFlowables.replay(5);
Flowable<O> o = f
.filter(...)
.map(...)
.concatWith(
cf
.filter(...)
.map(...)
);
cf.connect();
return o;
};
}
然而——同样的事情也会发生。cancellation
只冒泡到ConnectableFlowable
.
但是,我需要在整个 Flowable 上进行取消,以便项目的来源停止生产。
有没有办法做到这一点?或者我应该以不同的方式处理分支逻辑?