4

我有两个 orderedIObservable<double>并想将它们合并为一个 ordered IObservable<double>。下面给出一个例子:

A    2  3  4  -  -       5  -
B    -  -  -  1  5       -  6
Out  -  -  -  1  2 3 4   5  -

这个想法是Out只有在确定最终顺序时才会产生值。我确信这应该很容易做到,但我无法提供一个好的解决方案(在这种情况下,好的意味着尽可能多地由 rx 运算符组成);

编辑:我希望以下程序产生以下输出

static void Main(string[] args)
{
    var a = new Subject<int>();
    var b = new Subject<int>();

    a.MergeSort(b).Subscribe(Console.WriteLine);

    a.OnNext(2);
    Console.WriteLine("tick");
    a.OnNext(4);
    Console.WriteLine("tick");
    a.OnNext(6);
    Console.WriteLine("tick");
    b.OnNext(0);
    Console.WriteLine("tick");
    b.OnNext(1);
    Console.WriteLine("tick");
    b.OnNext(5);
    Console.WriteLine("tick");
    b.OnNext(7);
    Console.WriteLine("tick");
}

Output:
tick
tick
tick
0
tick
1
tick
2
4
5
tick
6
tick
4

1 回答 1

1

这是作为 RX 扩展运算符执行的

public static class MergeMixins
{
    public static IObservable<int> MergeSort(this IObservable<int> This, IObservable<int> other)
    {
        return Observable.Create<int>((observer) =>
            {
                Queue<int> BufferA = new Queue<int>();
                Queue<int> BufferB = new Queue<int>();

                Action<Queue<int>, int> update = (Queue<int> pushBuffer, int value)=>{

                    pushBuffer.Enqueue(value);


                    while (BufferA.Count() != 0 && BufferB.Count() != 0)
                    {
                        if (BufferA.Peek() < BufferB.Peek())
                            observer.OnNext(BufferA.Dequeue());
                        else
                            observer.OnNext(BufferB.Dequeue());
                    }
                };

                return new CompositeDisposable(
                    This.Subscribe(v => update(BufferA, v)),
                    other.Subscribe(v => update(BufferB, v)));

            });

    }

}

广告我的测试输出正在使用您的测试

Result StandardOutput:  
tick
tick
tick
0
tick
1
tick
2
4
5
tick
6
tick
于 2012-11-21T07:16:18.550 回答