7

可以这么说,我进入了 Rx 狂欢,这个问题与我的herehere相关。尽管如此,也许这些对某人有所帮助,因为我可以将它们视为同一主题的有用变体。

问题:如何将一个随机流int(例如,在随机间隔上产生的间隔 [0, 10])对象分组并为每个组提供可变数量的没有事件警报(由于缺乏更好的定义,对于更多背景见链接帖子)。更具体地说,使用代码,如何在下面定义每组的多个油门设置:

var idAlarmStream = idStream
.Select(i => i)
.GroupByUntil(key => key.Id, grp => grp.Throttle(Timespan.FromMilliseconds(1000))
.SelectMany(grp => grp.TakeLast(1))
.Subscribe(i => Console.WriteLine(i));

如果每个组的 ID 缺席时间超过一秒,就会调用 subscribe 函数。如果想为没有事件定义三个不同的值(例如,一秒、五秒和十秒)并在事件到达时全部取消,该怎么办?我能想到的是:

  • 将每个 ID 拆分idStream为几个合成 ID,并提供真实 ID 和合成 ID 之间的双射映射。例如在这种情况下 ID: 1 -> 100, 101, 102; ID: 2 -> 200, 201, 203 然后Throttle像这样定义一个选择器函数Func<int, Timespan>(i => /* switch(i)...*/),然后什么时候Subscribe被调用,将ID映射回来。另请参阅链接的问题以获取更多背景信息。
  • 创建一个嵌套分组,其中对 ID 进行分组,然后根据限制值将 ID 组进一步复制/复制/分叉(我不知道正确的术语)到组中。我认为这种方法相当复杂,我不确定它是否是最好的方法。不过,我肯定有兴趣看到这样的查询。

我怀疑,在更一般的环境中,这是每个组有多个处理程序的情况,尽管我没有设法找到与此相关的任何内容。

<编辑: 作为(希望澄清)一个示例idStream推送一个 ID:1,在该 ID 上将启动三个不同的计数器,每个计数器都等待下一个事件发生或在没有及时检测到新的 ID 1 时发出警报。计数器 1 (C1) 等待 5 秒,计数器 2 (C2) 等待 7 秒,计数器 3 (C3) 等待 10 秒。如果在间隔 [0, 5] 秒内接收到新的 ID 1,则所有计数器将使用上述值重新初始化,并且不会发送警报。如果在间隔 [0, 7) 秒内接收到新 ID,则 C1 警报以及 C2 和 C3 将重新初始化。同样,如果在间隔 [0, 10) 秒内收到新 ID,C1 和 C2 将触发,但 C3 刚刚重新初始化。

也就是说,在某些条件下,对于一个 ID,将有多个“缺席警报”或一般采取的行动。我不确定什么是一个好的模拟......也许在塔中堆叠“警报灯”,以便首先是绿色,然后是黄色,最后是红色。随着 ID 缺失的持续时间越来越长,一个又一个的颜色将被点亮(在这种情况下,红色是最后一个)。然后当检测到一个ID时,所有的灯都会关闭。

<编辑 2: 在将 James 的代码修改为如下示例并将其余部分保持原样后,我发现Subscribe将在两个警报级别上的第一个事件时直接调用。

const int MaxLevels = 2;
var idAlarmStream = idStream
    .Select(i => i)
    .AlarmSystem(keySelector, thresholdSelector, MaxLevels, TaskPoolScheduler.Default)
    .Subscribe(i =>
    {
        Debug.WriteLine("Alarm on id \"{0}\" at {1}", i, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture));
    });

让我们看看这里发生了什么,如果MaxLevels可以动态提供......

<编辑 3:詹姆斯的代码有效。问题出在椅子和键盘之间!将时间更改为更合理的时间确实有帮助。事实上,我把它们改成了更大的数字,但它确实是.FromTicks,它让我逃了几分钟。

4

1 回答 1

5

我认为这很有效 - 我稍后会尝试添加更完整的解释。每个警报级别都有一个定义的阈值(每个信号组)。这些预计将持续增加。

基本思想是将所有先前级别的信号输入当前级别。第一级是信号本身的“零”级,在返回警报流之前被过滤掉。请注意,TSignal 键需要支持值标识。

我相信还有简化的空间!

样本单元测试:

public class AlarmTests : ReactiveTest
{
    [Test]
    public void MultipleKeyMultipleSignalMultipleLevelTest()
    {
        var threshold1 = TimeSpan.FromTicks(300);
        var threshold2 = TimeSpan.FromTicks(800);

        var scheduler = new TestScheduler();
        var signals = scheduler.CreateHotObservable(
            OnNext(200, 1),
            OnNext(200, 2),
            OnNext(400, 1),
            OnNext(420, 2),
            OnNext(800, 1),
            OnNext(1000, 1),
            OnNext(1200, 1));

        Func<int, int> keySelector = i => i;
        Func<int, int, TimeSpan> thresholdSelector = (key, level) =>
        {
            if (level == 1) return threshold1;
            if (level == 2) return threshold2;
            return TimeSpan.MaxValue;
        };

        var results = scheduler.CreateObserver<Alarm<int>>();

        signals.AlarmSystem(
            keySelector,
            thresholdSelector,
            2,
            scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(700, new Alarm<int>(1, 1)),
            OnNext(720, new Alarm<int>(2, 1)),
            OnNext(1220, new Alarm<int>(2, 2)),
            OnNext(1500, new Alarm<int>(1, 1)),
            OnNext(2000, new Alarm<int>(1, 2)));
    }

    [Test]
    public void CheckAlarmIsSuppressed()
    {
        var threshold1 = TimeSpan.FromTicks(300);
        var threshold2 = TimeSpan.FromTicks(500);

        var scheduler = new TestScheduler();
        var signals = scheduler.CreateHotObservable(
            OnNext(200, 1),
            OnNext(400, 1),
            OnNext(600, 1));

        Func<int, int> keySelector = i => i;

        Func<int, int, TimeSpan> thresholdSelector = (signal, level) =>
        {
            if (level == 1) return threshold1;
            if (level == 2) return threshold2;
            return TimeSpan.MaxValue;
        };

        var results = scheduler.CreateObserver<Alarm<int>>();

        signals.AlarmSystem(
            keySelector,
            thresholdSelector,
            2,
            scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(900, new Alarm<int>(1, 1)),
            OnNext(1100, new Alarm<int>(1, 2)));
    }
}



public static class ObservableExtensions
{
    /// <summary>
    /// Create an alarm system that detects signal gaps of length
    /// determined by a signal key and signals alarms of increasing severity.
    /// </summary>
    /// <typeparam name="TSignal">Type of the signal</typeparam>
    /// <typeparam name="TKey">Type of the signal key used for grouping, must implement Equals correctly</typeparam>
    /// <param name="signals">Input signal stream</param>
    /// <param name="keySelector">Function to select a key from a signal for grouping</param>
    /// <param name="thresholdSelector">Function to select a threshold for a given signal key and alarm level.
    /// Should return TimeSpan.MaxValue for levels above the highest level</param>
    /// <param name="levels">Number of alarm levels</param>
    /// <param name="scheduler">Scheduler use for throttling</param>
    /// <returns>A stream of alarms each of which contains the signal and alarm level</returns>
    public static IObservable<Alarm<TSignal>> AlarmSystem<TSignal, TKey>(
        this IObservable<TSignal> signals,
        Func<TSignal, TKey> keySelector,
        Func<TKey, int, TimeSpan> thresholdSelector,
        int levels,
        IScheduler scheduler)
    {
        var alarmSignals = signals.Select(signal => new Alarm<TSignal>(signal, 0))
                                  .Publish()
                                  .RefCount();

        for (int i = 0; i < levels; i++)
        {
            alarmSignals = alarmSignals.CreateAlarmSystemLevel(
                keySelector, thresholdSelector, i + 1, scheduler);
        }

        return alarmSignals.Where(alarm => alarm.Level != 0);

    }

    private static IObservable<Alarm<TSignal>> CreateAlarmSystemLevel<TSignal, TKey>(
        this IObservable<Alarm<TSignal>> alarmSignals,
        Func<TSignal, TKey> keySelector,
        Func<TKey, int, TimeSpan> thresholdSelector,
        int level,
        IScheduler scheduler)
    {
        return alarmSignals
            .Where(alarmSignal => alarmSignal.Level == 0)
            .Select(alarmSignal => alarmSignal.Signal)
            .GroupByUntil(
                keySelector,
                grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler))
            .SelectMany(grp => grp.TakeLast(1).Select(signal => new Alarm<TSignal>(signal, level)))
            .Merge(alarmSignals);
    }
}

public class Alarm<TSignal> : IEquatable<Alarm<TSignal>>
{
    public Alarm(TSignal signal, int level)
    {
        Signal = signal;
        Level = level;
    }

    public TSignal Signal { get; private set; }
    public int Level { get; private set; }

    private static bool Equals(Alarm<TSignal> x, Alarm<TSignal> y)
    {
        if (ReferenceEquals(x, null))
            return false;
        if (ReferenceEquals(y, null))
            return false;
        if (ReferenceEquals(x, y))
            return true;

        return x.Signal.Equals(y.Signal) && x.Level.Equals(y.Level);
    }

    // Equality implementation added to help with testing.
    public override bool Equals(object other)
    {
        return Equals(this, other as Alarm<TSignal>);
    }

    public override string ToString()
    {
        return string.Format("Signal: {0} Level: {1}", Signal, Level);
    }

    public bool Equals(Alarm<TSignal> other)
    {
        return Equals(this, other);
    }

    public static bool operator ==(Alarm<TSignal> x, Alarm<TSignal> y)
    {
        return Equals(x, y);
    }

    public static bool operator !=(Alarm<TSignal> x, Alarm<TSignal> y)
    {
        return !Equals(x, y);
    }

    public override int GetHashCode()
    {
        return ((Signal.GetHashCode()*37) ^ Level.GetHashCode()*329);
    }
}
于 2013-10-18T01:39:13.727 回答