有很多方法可以解决这个问题,让我们看一个简单的方法。在此示例中,我假设事件A
和B
属于同一类型。我还假设事件C
(tied to A
) 和D
(tied to D
) 也是可观察的,没有任何重要信息。
var streamA = new Subject<string>();
var streamB = new Subject<string>();
var switchToA = new Subject<Unit>();
var switchToB = new Subject<Unit>();
现在在这里我们在触发时打开一个streamA
窗口,并在触发switchToA
时关闭它switchToB
,并对 执行相反的操作streamB
,然后将它们合并:
public IObservable<string> SwitchingWindows(IObservable<string> streamA, IObservable<string> streamB, IObservable<Unit> switchToA, IObservable<Unit> switchToB)
{
var switchedA = streamA.Window(switchToA, _ => switchToB).Concat();
var switchedB = streamB.Window(switchToB, _ => switchToA).Concat();
return switchedA.Merge(switchedB);
}
订阅:
var all = SwitchingWindows(streamA, streamB, switchToA, switchToB);
all.Subscribe(x => Console.WriteLine(x));
测试:
streamA.OnNext("a1"); // skip
switchToA.OnNext(Unit.Default); // switch A
streamA.OnNext("a2"); // shown
streamB.OnNext("b1"); // skip
streamA.OnNext("a3"); // shown
switchToB.OnNext(Unit.Default); // switch B
streamB.OnNext("b2"); // shown
streamB.OnNext("b3"); // shown
streamA.OnNext("a4"); // skip
switchToA.OnNext(Unit.Default); // switch A
streamA.OnNext("a5"); // shown
streamB.OnNext("b4"); // skip
streamB.OnNext("b5"); // skip
switchToB.OnNext(Unit.Default); // switch B
streamB.OnNext("b6"); // shown
streamA.OnNext("a6"); // skip
输出如预期:
a2
a3
b2
b3
a5
b6
最终流all
是 100% 干净的。