对于基于 Rx 的更改跟踪解决方案,我需要一个操作员,它可以让我获得可观察序列中的第一个和最近的项目。
我将如何编写一个生成以下大理石图的 Rx 运算符(注意:括号仅用于排列项目......我不确定如何最好地在文本中表示这一点):
xs:---[a ]---[b ]-----[c ]-----[d ]---------|
desired:---[a,a]---[a,b]-----[a,c]-----[a,d]---------|
对于基于 Rx 的更改跟踪解决方案,我需要一个操作员,它可以让我获得可观察序列中的第一个和最近的项目。
我将如何编写一个生成以下大理石图的 Rx 运算符(注意:括号仅用于排列项目......我不确定如何最好地在文本中表示这一点):
xs:---[a ]---[b ]-----[c ]-----[d ]---------|
desired:---[a,a]---[a,b]-----[a,c]-----[a,d]---------|
使用与@Wilka 相同的命名,您可以使用以下扩展名,这有点不言自明:
public static IObservable<TResult> FirstAndLatest<T, TResult>(this IObservable<T> source, Func<T,T,TResult> func)
{
var published = source.Publish().RefCount();
var first = published.Take(1);
return first.CombineLatest(published, func);
}
请注意,它不一定返回 a Tuple
,而是为您提供在结果上传递选择器函数的选项。这使它与底层的主要操作 ( CombineLatest
) 保持一致。这显然很容易改变。
用法(如果你想在结果流中使用元组):
Observable.Interval(TimeSpan.FromSeconds(0.1))
.FirstAndLatest((a,b) => Tuple.Create(a,b))
.Subscribe(Console.WriteLine);
试试这个:
public static IObservable<Tuple<T, T>> FirstAndLatest<T>(
this IObservable<T> source)
{
return
source
.Take(1)
.Repeat()
.Zip(source, (x0, xn) => Tuple.Create(x0, xn));
}
很简单吧?
或者,作为共享基础源的替代方法,试试这个:
public static IObservable<Tuple<T, T>> FirstAndLatest<T>(
this IObservable<T> source)
{
return
source.Publish(
s =>
s.Take(1)
.Repeat()
.Zip(s, (x0, xn) => Tuple.Create(x0, xn)));
}
哎呀!刮这个。它不起作用。它本质上一直在产生一对最新的值。这样的发布是行不通的。原始实现是最好的。
我怀疑有更好的方法来做到这一点(我不喜欢使用 Do),但你可以创建这样的运算符
public static IObservable<Tuple<T, T>> FirstAndLatest2<T>(this IObservable<T> source)
{
return Observable.Defer(() => {
bool hasFirst = false;
T first = default(T);
return source
.Do(item =>
{
if (!hasFirst)
{
hasFirst = true;
first = item;
}
})
.Select(current => Tuple.Create(first, current));
});
}
然后你会像这样使用它:
Observable.Interval(TimeSpan.FromSeconds(0.1))
.FirstAndLatest()
.Subscribe(Console.WriteLine);