我想IObservable<T>
寻找与谓词匹配的元素,如果找不到,则返回IObservable<T>
. 我不想存储整个内容,IObservable<T>
也不想循环IObservable
两次,所以我设置了一个扩展方法
public static class ObservableExtensions
{
public static IObservable<T> FirstOrLastAsync<T>(this IObservable<T> source, Func<T, bool> pred)
{
return Observable.Create<T>(o =>
{
var hot = source.Publish();
var store = new AsyncSubject<T>();
var d1 = hot.Subscribe(store);
var d2 = hot.FirstAsync(x => pred(x)).Amb(store).Subscribe(o);
var d3 = hot.Connect();
return new CompositeDisposable(d1, d2, d3);
});
}
public static T FirstOrLast<T>(this IObservable<T> source, Func<T, bool> pred)
{
return source.FirstOrLastAsync(pred).Wait();
}
}
Async 方法从传入的可能很冷的 observable 创建一个 hot observable。它订阅 anAsyncSubject<T>
以记住最后一个元素,并订阅 anIObservable<T>
查找该元素。然后它从这些IObservable<T>
s 中的任何一个中获取第一个元素,该元素首先通过.Amb
(在收到消息AsyncSubject<T>
之前不返回值) 返回一个值。.OnCompleted
我的问题如下:
- 这可以使用不同的 Observable 方法更好或更简洁地编写吗?
- 是否所有这些一次性用品都需要包含在 CompositeDisposable 中?
- 当 hot observable 在没有找到匹配元素的情况下完成时,FirstAsync 抛出异常和 AsyncSubject 传播其值之间是否存在竞争条件?
- 如果是这样,我是否需要将行更改为:
var d2 = hot.Where(x => pred(x)).Take(1).Amb(store).Subscribe(o);
我对 RX 很陌生,这是我对 IObservable 的第一个扩展。
编辑
我最终去了
public static class ObservableExtensions
{
public static IObservable<T> FirstOrLastAsync<T>(this IObservable<T> source, Func<T, bool> pred)
{
var hot = source.Publish().RefCount();
return hot.TakeLast(1).Amb(hot.Where(pred).Take(1).Concat(Observable.Never<T>()));
}
public static T FirstOrLast<T>(this IObservable<T> source, Func<T, bool> pred)
{
return source.FirstOrLastAsync(pred).First();
}
}