15

在一个IObservable序列中(在 Reactive Extensions for .NET 中),我想获取先前和当前元素的值,以便我可以比较它们。我在网上找到了一个类似于下面的示例,它完成了任务:

sequence.Zip(sequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur })

它工作正常,除了它两次评估序列,我想避免。您可以看到使用以下代码对其进行了两次评估:

var debugSequence = sequence.Do(item => Debug.WriteLine("Retrieved an element from sequence"));
debugSequence.Zip(debugSequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }).Subscribe();

输出显示的调试行数是序列中元素数的两倍。

我理解为什么会发生这种情况,但到目前为止,我还没有找到不评估序列两次的替代方案。如何仅通过一个序列评估将先前和当前结合起来?

4

5 回答 5

30

我认为有一个更好的解决方案,它使用 Observable.Scan 并避免双重订阅:

public static IObservable<Tuple<TSource, TSource>>
    PairWithPrevious<TSource>(this IObservable<TSource> source)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (acc, current) => Tuple.Create(acc.Item2, current));
}

我已经在我的博客上写了这个:http ://www.zerobugbuild.com/?p=213

附录

进一步的修改允许您通过使用结果选择器更干净地处理任意类型:

public static IObservable<TResult> CombineWithPrevious<TSource,TResult>(
    this IObservable<TSource> source,
    Func<TSource, TSource, TResult> resultSelector)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (previous, current) => Tuple.Create(previous.Item2, current))
        .Select(t => resultSelector(t.Item1, t.Item2));
}
于 2013-05-16T07:35:08.040 回答
4

@James World 附录对我来说看起来很棒,如果不是因为Tuple<>,我几乎总是不喜欢:“ .Item1 是以前的吗?还是现在的?我不记得了。选择器的第一个参数是什么,是吗上一项? ”。

对于那部分,我喜欢@dcstraw 对专用的定义ItemWithPrevious<T>。所以你去吧,把两者放在一起(希望我没有把以前的和现在的混为一谈)和一些重命名和设施:

public static class ObservableExtensions
{
    public static IObservable<SortedPair<TSource>> CombineWithPrevious<TSource>(
        this IObservable<TSource> source, 
        TSource initialValue = default(TSource))
    {
        var seed = SortedPair.Create(initialValue, initialValue);

        return source.Scan(seed,
            (acc, current) => SortedPair.Create(current, acc.Current));
    }

    public static IObservable<TResult> CombineWithPrevious<TSource, TResult>(
        this IObservable<TSource> source,
        Func<SortedPair<TSource>, TResult> resultSelector,
        TSource initialValue = default(TSource))
    {
        var seed = SortedPair.Create(initialValue, initialValue);

        return source
            .Scan(seed,
                (acc, current) => SortedPair.Create(current, acc.Current))
            .Select(p => resultSelector(p));
    }
}

public class SortedPair<T>
{
    public SortedPair(T current, T previous)
    {
        Current = current;
        Previous = previous;
    }

    public SortedPair(T current) : this(current, default(T)) { }

    public SortedPair() : this(default(T), default(T)) { }

    public T Current;
    public T Previous;
}

public class SortedPair
{
    public static SortedPair<T> Create<T>(T current, T previous)
    {
        return new SortedPair<T>(current, previous);
    }

    public static SortedPair<T> Create<T>(T current)
    {
        return new SortedPair<T>(current);
    }

    public static SortedPair<T> Create<T>()
    {
        return new SortedPair<T>();
    }
}
于 2015-08-21T10:22:18.517 回答
3

评估两次是Cold observable的指标。您可以使用 .Publish() 将其变为热门:

var pub = sequence.Publish();
pub.Zip(pub.Skip(1), (...
pub.Connect();
于 2010-05-13T01:19:05.417 回答
0

如果您只需要在订阅期间访问上一个元素,这可能是最简单的方法。(我敢肯定有更好的方法,也许是 IObservable 上的缓冲区运算符?目前文档很少,所以我不能告诉你。)

    EventArgs prev = null;

    sequence.Subscribe(curr => 
    {
        if (prev != null)
        {
            // Previous and current element available here
        }

        prev = curr;                              

    });

EventArgs 只是事件参数类型的替代。

于 2010-05-12T17:13:41.507 回答
-1

事实证明,您可以使用变量来保存先前的值并引用它并在IObservable扩展链中重新分配它。这甚至可以在辅助方法中使用。使用下面的代码,我现在可以调用CombineWithPrevious()myIObservable来获取对先前值的引用,而无需重新评估序列。

public class ItemWithPrevious<T>
{
    public T Previous;
    public T Current;
}

public static class MyExtensions
{
    public static IObservable<ItemWithPrevious<T>> CombineWithPrevious<T>(this IObservable<T> source)
    {
        var previous = default(T);

        return source
            .Select(t => new ItemWithPrevious<T> { Previous = previous, Current = t })
            .Do(items => previous = items.Current);
    }
}
于 2010-05-12T19:18:26.013 回答