1

I was playing around with linq/rx and wanted to do something simple, create a stream that listens to two different streams of numbers and when either number stream gets a new value, output the sums.

I tried the below:

static void Main(string[] args)
{
    var numSrcA = new Subject<int>();
    var numSrcB = new Subject<int>();

    var resultsLinq = from a in numSrcA
                      from b in numSrcB
                      select a + b;

    var resultsRx = numSrcA.SelectMany(a => numSrcB, (a, b) => a + b);

    resultsLinq.Subscribe(r => Console.WriteLine("Linq: " + r));
    resultsRx.Subscribe(r => Console.WriteLine("Rx: " + r));

    numSrcA.OnNext(1);
    numSrcB.OnNext(2);
    numSrcA.OnNext(3);

    Console.ReadLine();
}

Output:

Linq: 3
Rx: 3

I was expecting:

Linq: 3
Rx: 3
Linq: 5
Rx: 5

Though it seems that the new sum will only be computed when numSrcB gets a new value (in which case it sums that new value against all existing values in numSrcA).

Can this be changed so that an addition to either numSrc will recompute the sums?

4

1 回答 1

1

简短的回答,您想要一个不需要所有流上的新输入来触发结果评估的组合器 - 换句话说,您想要CombineLatest

var resultsLinq2 = Observable.CombineLatest(numSrcA, numSrcB, (a,b) => a+b);    
resultsLinq2.Subscribe(r => Console.WriteLine("Linq2: " + r));

结果是:

Linq2: 3
Linq: 3
Rx: 3
Linq2: 5
于 2013-04-19T16:41:08.767 回答