1

我想IObservable<T>寻找与谓词匹配的元素,如果找不到,则返回IObservable<T>. 我不想存储整个内容,IObservable<T>也不想循环IObservable两次,所以我设置了一个扩展方法

public static class ObservableExtensions
{
    public static IObservable<T> FirstOrLastAsync<T>(this IObservable<T> source, Func<T, bool> pred)
    {
        return Observable.Create<T>(o =>
        {
            var hot = source.Publish();
            var store = new AsyncSubject<T>();
            var d1 = hot.Subscribe(store);
            var d2 = hot.FirstAsync(x => pred(x)).Amb(store).Subscribe(o);
            var d3 = hot.Connect();
            return new CompositeDisposable(d1, d2, d3);
        });
    }

    public static T FirstOrLast<T>(this IObservable<T> source, Func<T, bool> pred)
    {
        return source.FirstOrLastAsync(pred).Wait();
    }
}

Async 方法从传入的可能很冷的 observable 创建一个 hot observable。它订阅 anAsyncSubject<T>以记住最后一个元素,并订阅 anIObservable<T>查找该元素。然后它从这些IObservable<T>s 中的任何一个中获取第一个元素,该元素首先通过.Amb(在收到消息AsyncSubject<T>之前不返回值) 返回一个值。.OnCompleted

我的问题如下:

  • 这可以使用不同的 Observable 方法更好或更简洁地编写吗?
  • 是否所有这些一次性用品都需要包含在 CompositeDisposable 中?
  • 当 hot observable 在没有找到匹配元素的情况下完成时,FirstAsync 抛出异常和 AsyncSubject 传播其值之间是否存在竞争条件?
  • 如果是这样,我是否需要将行更改为:

var d2 = hot.Where(x => pred(x)).Take(1).Amb(store).Subscribe(o);

我对 RX 很陌生,这是我对 IObservable 的第一个扩展。

编辑

我最终去了

public static class ObservableExtensions
{
    public static IObservable<T> FirstOrLastAsync<T>(this IObservable<T> source, Func<T, bool> pred)
    {
        var hot = source.Publish().RefCount();
        return hot.TakeLast(1).Amb(hot.Where(pred).Take(1).Concat(Observable.Never<T>()));
    }

    public static T FirstOrLast<T>(this IObservable<T> source, Func<T, bool> pred)
    {
        return source.FirstOrLastAsync(pred).First();
    }
}
4

2 回答 2

1

您可以将您想要的两个案例放在一起。如果你的源 observable 是冷的,你可以做一个Publish|Refcount.

    public static IObservable<T> FirstOrLast<T>(this IObservable<T> source, Func<T, bool> predicate)
    {
        return source.TakeLast(1).Amb(source.Where(predicate).Take(1));
    }

测试:

        var source = Observable.Interval(TimeSpan.FromSeconds(0.1))
                               .Take(10)
                               .Publish()
                               .RefCount();

        FirstOrLast(source, i => i == 5).Subscribe(Console.WriteLine); //5
        FirstOrLast(source, i => i == 11).Subscribe(Console.WriteLine); //9
于 2012-09-05T07:30:00.653 回答
0

我试图产生一个有效的“更简单”的查询,但到目前为止没有。

如果我坚持你的基本结构,我可以提供一点改进。试试这个:

public static IObservable<T> FirstOrLastAsync<T>(
    this IObservable<T> source, Func<T, bool> pred)
{
    return Observable.Create<T>(o =>
    {
        var hot = source.Publish();
        var store = new AsyncSubject<T>();
        var d1 = hot.Subscribe(store);
        var d2 =
            hot
                .Where(x => pred(x))
                .Concat(store)
                .Take(1)
                .Subscribe(o);
        var d3 = hot.Connect();
        return new CompositeDisposable(d1, d2, d3);
    });
}

它并没有好得多,但我喜欢它比使用Amb. 我认为这只是一个更清洁的东西。

于 2012-09-05T08:26:57.503 回答