1

我正在寻找诸如 Switch 之类的扩展,但不是默认为最新的流,而是需要一个带有额外变量的开关,该变量可以根据另一个外部事件来回切换。

例如。我们有 observable,它是 2 个流的合并AB当检测到事件C时,我希望 observableA通过,并且流B被忽略(丢失)。当检测到事件时,我希望删除并传递D可观察的流。AB

什么是最好的 linq 运算符?我想我可以结合使用 where 和一些自定义解决方案,但确实在寻找更优雅的解决方案。

此外,我想尽可能避免从流中订阅/重新订阅。我只是发现订阅/重新订阅通常很难跟踪,而且很难链接运营商。

4

1 回答 1

3

有很多方法可以解决这个问题,让我们看一个简单的方法。在此示例中,我假设事件AB属于同一类型。我还假设事件C(tied to A) 和D(tied to D) 也是可观察的,没有任何重要信息。

var streamA = new Subject<string>();
var streamB = new Subject<string>();

var switchToA = new Subject<Unit>();
var switchToB = new Subject<Unit>();

现在在这里我们在触发时打开一个streamA窗口,并在触发switchToA时关闭它switchToB,并对 执行相反的操作streamB,然后将它们合并:

public IObservable<string> SwitchingWindows(IObservable<string> streamA, IObservable<string> streamB, IObservable<Unit> switchToA, IObservable<Unit> switchToB)
{                                       
    var switchedA = streamA.Window(switchToA, _ => switchToB).Concat();
    var switchedB = streamB.Window(switchToB, _ => switchToA).Concat();

    return switchedA.Merge(switchedB);
}

订阅:

    var all = SwitchingWindows(streamA, streamB, switchToA, switchToB);
    all.Subscribe(x => Console.WriteLine(x));

测试:

streamA.OnNext("a1");           // skip
switchToA.OnNext(Unit.Default); // switch A
streamA.OnNext("a2");           // shown
streamB.OnNext("b1");           // skip
streamA.OnNext("a3");           // shown
switchToB.OnNext(Unit.Default); // switch B
streamB.OnNext("b2");           // shown
streamB.OnNext("b3");           // shown
streamA.OnNext("a4");           // skip
switchToA.OnNext(Unit.Default); // switch A
streamA.OnNext("a5");           // shown
streamB.OnNext("b4");           // skip
streamB.OnNext("b5");           // skip
switchToB.OnNext(Unit.Default); // switch B
streamB.OnNext("b6");           // shown
streamA.OnNext("a6");           // skip

输出如预期:

a2
a3
b2
b3
a5
b6

最终流all是 100% 干净的。

于 2012-05-02T00:32:50.240 回答