1

我有两个价值流和一个选择器流,我想生成一个基于选择器在价值流之间交替的结果流。下面的代码给出了正确的结果,但我不喜欢它。

有人有更整洁的吗?

var valueStreamA = new BehaviorSubject<int>(0);
var valueStreamB = new BehaviorSubject<int>(100);
var selectorStream = new BehaviorSubject<bool>(true);

var filteredA = valueStreamA .CombineLatest(selectorStream, (a, c) => new { A = a, C = c })
  .Where(ac => ac.C)
  .Select(ac => ac.A);
var filteredB = valueStreamB.CombineLatest(selectorStream, (b, c) => new { B = b, C = c })
  .Where(bc => !bc.C)
  .Select(bc => bc.B);

var result = Observable.Merge(filteredA, filteredB);
result.Subscribe(Console.WriteLine);

valueStreamA.OnNext(1);
valueStreamB.OnNext(101);
selectorStream.OnNext(false);

valueStreamA.OnNext(2);
valueStreamB.OnNext(102);
selectorStream.OnNext(true);

这会产生以下输出:

0
1
101
102
2
4

3 回答 3

2

我会做这样的事情:

var a = new BehaviorSubject<int>(0);
var b = new BehaviorSubject<int>(100);
var c = new BehaviorSubject<bool>(true);

var valueStreamA = a as IObservable<int>;
var valueStreamB = b as IObservable<int>;
var selector = c as IObservable<bool>;

var result = selector
    // for every change in the selector...
    .DistinctUntilChanged()
    // select one of the two value streams
    .Select(change => change ? valueStreamA : valueStreamB)
    // and flatten the resulting wrapped observable
    .Switch();

result.Subscribe(Console.WriteLine);

a.OnNext(1);
b.OnNext(101);
c.OnNext(false);

a.OnNext(2);
b.OnNext(102);
c.OnNext(true);
于 2013-02-07T21:24:43.043 回答
0

可以做类似的事情:

var xs = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => Feeds.Xs);
var ys = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => Feeds.Ys);
var selectorSubject = new Subject<Feeds>();

var query = from selector in selectorSubject
                select from merged in xs.Merge(ys)
                where merged == selector
                select merged;

query.Switch().Subscribe(Console.WriteLine);

OnNext 进入你的'selectorSubject'来改变它。您的示例有一些差异,但很容易解决:

  1. 您的问题涉及 bool 类型的选择器,而我一直很懒惰并重用 Feeds 枚举,以便让我进行简单的相等性检查(合并 == 选择器)。你当然可以简单地做(其中选择器?合并== Xs:合并== Ys),或者类似的事情来评估每个合并的项目并丢弃你不关心的项目(取决于你的选择器)。

具体来说,您可能不仅要选择整数,还要选择提要的标识符。考虑使用 Tuple.Create() 之类的东西,这样每次更新都会获得该信息:{A - 1}、{B - 101} 等。然后你可以在哪里做:哪里选择器?merge.Item1 == A : mapped.Item1 == B //这将“true”映射到 feed A

  1. 我还使用了一个 Switch,这将导致我的示例流重新启动,因为它们没有发布。你可能想发布你的并连接它们(让它们“热”),所以像我这样的 Switch 不会在订阅中造成任何新的副作用。您有一个主题(很热门),但“行为”部分将替换您传递给构造函数的值。发布和连接会阻止这种情况。

如果您仍然感到困惑,请大声喊叫。这不是一个完整的答案,但可能会给你足够的思考。

霍华德。

于 2013-02-07T21:01:26.787 回答
0

现在更接近你原来的问题:

void Main()

{

var valueStreamA = new BehaviorSubject<int>(0);
var valueStreamB = new BehaviorSubject<int>(100);

var selectorStreamA = valueStreamA.Select(id => Tuple.Create("A", id)).Publish();
var selectorStreamB = valueStreamB.Select(id => Tuple.Create("B", id)).Publish();
var selectorStream = new BehaviorSubject<bool>(true);

var query = from selector in selectorStream
            select from merged in selectorStreamA.Merge(selectorStreamB)
                   where selector == true ? merged.Item1 == "A" : merged.Item1 == "B"
                   select merged.Item2;

query.Switch().Subscribe(Console.WriteLine);

selectorStreamA.Connect();
selectorStreamB.Connect();

//First we get 0 output (because we are already using stream A, and it has a first value)
valueStreamA.OnNext(1); //This is output, because our selector remains as 'A'
valueStreamB.OnNext(101); //This is ignored - because we don't take from B
selectorStream.OnNext(false); //Switch to B

valueStreamA.OnNext(2); //Ignored - we are now using B only
valueStreamB.OnNext(102); //This is output
selectorStream.OnNext(true); //Switch back to A.

}

输出:

0 1 102

于 2013-02-07T21:21:54.123 回答