1

我有一个方法可以创建一个可观察的,它是多个其他可观察的合并。每个人都订阅某个符号以获取其数据:

var obs1 = dataSource
                .CreateObservable(someClass.Symbol1)
                .Select(result => ProcessData(result));

var obs2 = dataSource
                .CreateObservable(someClass.Symbol2)
                .Select(result => ProcessData(result));

var obs3 = dataSource
                .CreateObservable(someClass.Symbol3)
                .Select(result => ProcessData(result));

return Observable.Merge(obs1,obs2,obs3);


我订阅了合并的 observable,一切正常。现在,当任何符号发生变化时(someClass.Symbol 1/2/3),我希望重新创建可观察对象以使用新符号。当符号更改时,我会收到通知。

有没有办法使用 RX 让 observable 自动重新创建自己?也许使用 TakeUntil("symbol changed") 之类的东西然后做点什么?

不确定这是否可行,尤其是在使用合并的 observable 时。

4

2 回答 2

2

假设您可以创建一个IObservable<TSymbol> Symbol1Changed, 属性,该属性在每次更改时都会向您发送新符号,那么这样的事情将起作用:

var obs1 = someClass.Symbol1Changed
    .StartWith(someClass.Symbol1) // to start it with the current value before the first change
    .Select(newSymbol => dataSource.CreateObservable(newSymbol))
    .Switch()
    .Select(result => ProcessData(result);

var obs2 = ...
var obs3 = ...

return Observable.Merge(obs1, obs2, obs3);

如果您将 aBehaviorSubject用于您的Symbol1财产,那么您可以这样做:

var obs1 = someClass.Symbol1
    .Select(newSymbol => dataSource.CreateObservable(newSymbol))
    .Switch()
    .Select(result => ProcessData(result);
...
于 2013-09-10T19:22:04.243 回答
1

你当然可以用 Rx 做到这一点。这是它被设计用来做的事情。

您在这里真正描述的是,您将拥有三个要触发数据处理的符号来源。所以创建三个符号源:

var symbol1s = new Subject<Symbol>();
var symbol2s = new Subject<Symbol>();
var symbol3s = new Subject<Symbol>();

现在像这样定义每个可观察对象:

var obs1 =
    symbol1s
        .Select(symbol =>
            dataSource
                .CreateObservable(symbol)
                .Select(result => ProcessData(result)))
        .Switch();

这基本上是说随着每个出现的新符号切换到基于该符号的新可观察对象。

以相同的方式定义所有三个,然后像以前一样合并。

然后只需发出一个symbol1s.OnNext(someClass.Symbol1);(或类似的)就可以了。

您不必使用主题来定义您的可观察对象。如果在您的程序中更有意义,您可以使用事件或其他来源。

于 2013-09-11T03:15:27.150 回答