2

我有一个热的可观察传感器数据流。我需要一个可观察的信号,该信号仅在传感器值低于15 一段时间后触发。如果在任何时候该值超过 15,它应该重置滑动窗口。我已经使用下面的代码使其部分工作 - 但是如果值始终低于 15,它不会触发。

var notification = _sensor.Where(v => v >= 15)
                          .Throttle(new TimeSpan(0, 1, 0))
                          .SelectMany(_ => Observable.Return(Unit.Default));

有什么建议么?

4

2 回答 2

1

如果_sensor从不发出等于或大于 15 的值,Throttle则永远不会调用 。

简单的解决方法是将唤醒通知添加到_sensornotification

var wakeup = Observable.Return(15);

var notification = _sensor.Merge(wakeup)
                          .Where(v => v >= 15)
                          .Throttle(new TimeSpan(0, 1, 0))
                          .SelectMany(_ => Observable.Return(Unit.Default));
于 2015-09-09T07:38:48.783 回答
0

如果我正确理解您的需求,则此方法有效:

var notification = _sensor.Publish(ps => ps
    .Select(x => x >= 15.0)
    .DistinctUntilChanged()
    .Select(p => p
        ? Observable.Empty<double>()
        : Observable
            .Timer(TimeSpan.FromMinutes(1.0))
            .Select(x => -1.0)
            .IgnoreElements()
            .Concat(ps))
    .Switch());

我假设这_sensor是一个IObservable<double>.

_sensor因此,只要这些值保持在 15.0 以下至少一分钟,这个 observable 就会发布流中的所有值。我已经测试了这个功能。

我的测试代码是:

var random = new Random();
var _sensor = Observable.Generate(
    0,
    x => true,
    x => x,
    x => random.NextDouble() * 16.0,
    x => TimeSpan.FromSeconds(random.NextDouble()));

var published_sensor = _sensor.Publish();

var notification = published_sensor.Publish(ps => ps
    .Select(x => x >= 15.0)
    .DistinctUntilChanged()
    .Select(p => p
        ? Observable.Empty<double>()
        : Observable
            .Timer(TimeSpan.FromSeconds(5.0))
            .Select(x => -1.0)
            .IgnoreElements()
            .Concat(ps))
    .Switch());

published_sensor.Merge(notification).Timestamp().Dump();

published_sensor.Connect();

我得到的结果是:

结果

请注意,在发布重复值之前会经过 5 秒,并且当源产生超过 15 的值时重复停止。

于 2015-09-09T08:23:07.440 回答