2

我想使用一个可以随时填充的 IObservable。

我有这个扩展方法:

public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
        Func<TSource, IObservable<TOut>> selector)
{
    return Observable.Defer(() =>
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

        return source
            .Zip(queue, (v, q) => v)
            .SelectMany(v => selector(v)
                .Do(_ =>
                {

                }, () =>
                {
                    queue.OnNext(new Unit());
                })
            );
    });
}

我使用如下:

_moviesToTranslateObservable = new Subject<IMovie>();
_moviesToTranslateObservable.Drain(s => Observable.Return(s).Delay(TimeSpan.FromMilliseconds(250)))
  .Subscribe(async movieToTranslate =>
      {
      }

推送新项目后:

_moviesToTranslateObservable.OnNext(movieToTranslate);

IObservable 被消耗。

我的问题是,当我添加很多项目时,我不想使用已添加的第一个项目,而是最后添加的项目(如堆栈,而不是队列)。

我怎样才能做到这一点?BehaviorSubject 是否适合堆栈消耗行为?

4

1 回答 1

1

我知道变量名说queue,但这BehaviorSubject并不是真正的队列,它更像是一把锁。排队确实发生在Zip函数内部,该函数带有一个内部队列。

至于在 FIFO 和 LIFO 之间切换,我不确定您想要什么标准,但这里有Drain.

public static IObservable<TOut> DrainReverse<TSource, TOut>(this IObservable<TSource> source,
        Func<TSource, IObservable<TOut>> selector)
{
    return Observable.Defer(() =>
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
        var stack = new Stack<TSource>();

        return source
            .Do(item => stack.Push(item))
            .Zip(queue, (v, q) => v)
            .Select(_ => stack.Pop())
            .SelectMany(v => selector(v)
                .Do(_ =>
                {

                }, () =>
                {
                    queue.OnNext(new Unit());
                })
            );
    });
}

与以下运行代码一起使用时:

var s = new Subject<int>();
var d = s.DrainReverse(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(250)));
d.Subscribe(i => Console.WriteLine(i));
s.OnNext(0);
s.OnNext(1);
s.OnNext(2);
s.OnNext(3);
s.OnNext(4);
s.OnNext(5);

正确产生0, 5, 4, 3, 2, 1

于 2017-09-26T14:30:35.293 回答