4

我有一种感觉,这可能是我错过的一个非常简单的扩展方法,但我看不到它......

我基本上想采用一个流来产生一个流,其中值随着每个新值缓慢增加。我想节流/采样这个,不是按时间,而是按“容忍度”。例如

var ob = Enumerable.Range(0, 30).ToObservable(); // 0, 1, 2, 3, 4, 5,....., 30
var largeMovingOb = ob.WhenChangedBy(10); // 0, 10, 20, 30

当我有诸如 [1, 4, 20, 33] 之类的序列并且我想在值更改超过最后一个的 15 时输出 - 这将导致:[1, 20]。其中值 12 的变化将导致:[1,20,33]

是否有内置的 Rx 扩展?理想情况下,它适用于所有数字类型,而无需为每种类型编写重载。

4

4 回答 4

9

我认为这很适合Observable.Scan

var ob = Enumerable.Range(0, 30).ToObservable();
var largeMovingOb = ob.Scan((acc, i) => acc + 10 > i ? acc : i)
  .DistinctUntilChanged();
于 2012-11-22T00:22:14.933 回答
0

您可以重新利用的另一个内置运算符是DistinctUntilChanged,它将为您跟踪最后一个值。这里最大的“hack”是IEqualityComparer可能不遵循平等的标准期望(a == b && b == c并不意味着a == c取决于函数)

public static IObservable<T> DistinctUntilChangedBy<T>(
    this IObservable<T> source, Func<T, T, bool> isChanged)
{
    //check arguments
    return source.DistinctUntilChanged(new MarginEqualityComparer<T>(isChanged));
}

class MarginEqualityComparer<T> : IEqualityComparer<T>
{
    MarginEqualityComparer(Func<T, T, bool> comparer)
    {
        _comparer = comparer;
    }

    private readonly Func<T, T, bool> _comparer;

    public bool Equals(T x, T y)
    {
        return _comparer(x, y);
    }

    public int GetHashCode(T obj)
    {
        throw new NotSupportedException("This comparer does not support hashing.");
    }
}
于 2012-11-21T18:07:05.610 回答
0

有一个内置的运算符可以执行您想要的操作。

试试这个:

var ob = Observable.Range(0, 30);
var largeMovingOb = ob.DistinctUntilChanged(x => x / 10);

它适用于任何类型,而不仅仅是数字类型,因为签名如下所示:

IObservable<TSource> DistinctUntilChanged<TSource, TKey>(
    this IObservable<TSource> source, Func<TSource, TKey> keySelector)

简单的。

于 2012-11-22T00:43:15.337 回答
-1

您可以在上使用Where扩展方法IObservable<T>,跟踪您上次产生的内容,并且仅当值超过公差级别的最后产生的值时,谓词才返回 true。

这可以包装到一个扩展方法中,该方法利用闭包来执行此操作,如下所示:

public static IObservable<int> WhenLastObservedChangesByMoreThan(
    this IObservable<int> observable, int tolerance)
{
    // Validate parameters.
    if (observable == null) throw new ArgumentNullException("observable");

    // Tolerance must be positive, so comparisons are correct after
    // addition/subtraction.
    if (tolerance < 0) 
        throw new ArgumentOutOfRangeExeption("tolerance", tolerance,
            "The tolerance parameter must be a non-negative number.");

    // Shortcut: If tolerance is 0, then every value is returned, just
    // return the observable.
    if (tolerance == 0) return observable;

    // The last value yielded.
    int? lastYielded = null;

    // Filter.
    observable = observable.Where(i => {
        // If there is a previous value
        // that was yielded.
        if (lastYielded != null)
        {
            // Is the last value within
            // tolerance?
            if (i - tolerance < i && i < i + tolerance)
            {
                // Do not process.
                return false;
            }
        }

        // This is being yielded, store the value.
        lastYielded = i;

        // Yield the value.
        return true;
    });
}

请注意,上面不是线程安全的,如果您从多个线程IObservable<T>调用OnNext,那么您必须锁定对lastYielded变量的访问(这很容易使用lock语句来完成)。

于 2012-11-21T17:28:22.407 回答