0

我有一些像这样Flowable组成的FlowableTransformer

public static FlowableTransformer<I, O> transformer() {
    return f -> {
        return Flowable.mergeArray(5, 5, f.filter(...).map(...), f.filter(...).map(...));
    }
}

这里的基本用例是具有某种形式的分支逻辑- 如果元素是 A ( filter) 则执行 B ( map) 或者如果它们是 C 则执行 D。

但是,在下游,当我存储Disposableand 稍后调用cancel它时,取消只会在流中冒泡,直到Flowable返回mergeArray. 其上游,即呼叫上游的运营商compose(transformer()),从未收到取消。

所以我认为这本身就是一个问题Flowable.merge()(或者更确切地说:我缺乏理解),所以我改用 areplay()和 a ConnectableFlowable

public static FlowableTransformer<I, O> transformer() {
    return f -> {
        ConnectableFlowable<TaggedUpdates> cf = taggedUpdateFlowables.replay(5);
        
        Flowable<O> o = f
            .filter(...)
            .map(...)
            .concatWith(
                    cf
                        .filter(...)
                        .map(...)
            );
        
        cf.connect();
        return o;
    };
}

然而——同样的事情也会发生。cancellation只冒泡到ConnectableFlowable.

但是,我需要在整个 Flowable 上进行取消,以便项目的来源停止生产。

有没有办法做到这一点?或者我应该以不同的方式处理分支逻辑?

4

1 回答 1

0

这是我目前正在解决的问题。而不是使用合并,或者replay我已将其更改为仅使用flatMap

public static FlowableTransformer<I, O> transformer() {
    return f -> {
        return f
            .flatMap(i -> {
                return Flowable.just(i.items)
                     .filter(...)
                     .map(...)
                     .concatWith(
                         .flatMap(i -> {
                             Flowable.just(i.items)
                                 .filter(...)
                                 .map(...)
                         })
                     );
               });
    };
}

但是,稍后我有点担心我将无法在我确实需要使用的地方执行此解决方法replay,例如上游的计算成本很高。

于 2020-06-26T09:28:30.073 回答