2

我正在使用 Observable.Merge 组合多个序列并在 UI 的单个视图中显示。用户可以选择在 UI 中添加或删除序列(提要)。虽然我使用 Merge 来合并提要。我不确定如何将 IObservable 从合并的序列中分离出来。目前我正在创建一个全新的 IObservable,省略了我想要的提要。是否可以动态添加和删除 ViewModel 已经订阅的 IObsevable?

4

3 回答 3

1

看看 usingIObservable<IObservable<T>>然后使用Merge. 这会自动允许您通过结束内部来删除序列IObservable<T>。简单的。

于 2012-11-24T08:50:51.370 回答
1

像这样的东西也可以,但我怀疑有更简洁的方法。

class Merger<T>
{
    Subject<T> _merged = new Subject<T>();

    public IObservable<T> Merged { get { return _merged; } }

    public IDisposable Add(IObservable<T> newStream)
    {
        return newStream.Subscribe(_merged);
    }
}

要从合并的流中删除某些内容,请处置 IDisposable。

于 2012-11-25T23:57:24.663 回答
0

这可能有助于您开始,但在重新连接时,因为 b 是冷的,b 将从头开始重新启动:

var a = Observable.Generate('A', x => x <= Char.MaxValue, x => ++x, x => x, x => TimeSpan.FromMilliseconds(200)).Select(x => "a: " + x).Publish();
var b = Observable.Generate('a', x => x <= Char.MaxValue, x => ++x, x => x, x => TimeSpan.FromMilliseconds(500)).Select(x => "b: " + x).Publish();

var merged = a.Merge(b).Publish();
var submerged = merged.Subscribe(x => x.Dump());

var subA = a.Connect();
var subB = b.Connect();
merged.Connect();

Task.Delay(2000).ContinueWith(t => subB.Dump("Disposing b.").Dispose());
Task.Delay(4000).ContinueWith(t => b.Connect()).ContinueWith(_ => "Reconnected to b");

编辑:

要向合并的 IO 添加另一个“c”:

var a = Observable.Generate('A', x => x <= Char.MaxValue, x => ++x, x => x, x => TimeSpan.FromMilliseconds(200)).Select(x => "a: " + x).Publish();
var b = Observable.Generate('a', x => x <= Char.MaxValue, x => ++x, x => x, x => TimeSpan.FromMilliseconds(500)).Select(x => "b: " + x).Publish();
var c = Observable.Generate('1', x => x <= Char.MaxValue, x => ++x, x => x, x => TimeSpan.FromMilliseconds(100)).Select(x => "c: " + x).Publish();

var merged = a.Merge(b).Merge(c).Publish();
var submerged = merged.Subscribe(x => x.Dump());

var subA = a.Connect();
var subB = b.Connect();
merged.Connect();

Task.Delay(2000).ContinueWith(t => subB.Dump("Disposing b.").Dispose());
Task.Delay(4000).ContinueWith(t => b.Connect()).ContinueWith(_ => "Reconnected to b".Dump());
Task.Delay(6000).ContinueWith(t => c.Connect()).ContinueWith(_ => "Connecting to c".Dump());
于 2012-11-23T15:24:17.233 回答