0

我需要去抖动输入流。

在第一次出现状态 1 时,我需要等待 5 秒并验证最后状态是否也是 1。只有我有一个稳定的信号。

(time)  0-1-2-3-4-5-6-7-8-9
(state) 0-0-0-0-0-1-0-1-0-1
(result)                   -> 1

这是一个不稳定信号的例子。

(time)  0-1-2-3-4-5-6-7-8-9
(state) 0-0-0-0-0-1-0-1-0-0
(result)                   -> 0

我尝试使用缓冲区,但缓冲区的起点是固定的,我需要从第一个事件开始等待 5 秒。

4

2 回答 2

4

从字面上理解您的要求

在第一次出现状态 1 时,我需要等待 5 秒并验证最后状态是否也是 1。只有我有一个稳定的信号。

我可以想出一些方法来解决这个问题。为了澄清我的假设,您只想在第一次出现 1 后 5 秒推送最后一个值。这将导致单个值序列产生 0 或 1(即,无论产生超过 5 的任何其他值距离源序列的秒数)

在这里,我用一些 jiggery-pokery 重新创建了你的序列。

var source = Observable.Timer(TimeSpan.Zero,TimeSpan.FromSeconds(1))
    .Take(10)
    .Select(i=>{if(i==5 || i==7 || i==9){return 1;}else{return 0;}}); //Should produce 1;
    //.Select(i=>{if(i==5 || i==7 ){return 1;}else{return 0;}});    //Should produce 0;

下面的所有选项看起来都共享顺序。为了在 Rx 中安全地共享一个序列,我们 Publish() 并连接它。我通过 RefCount() 运算符使用自动连接。

var sharedSource = source.Publish().RefCount();

1) 在这个解决方案中,我们取第一个值 1,然后将选定的序列值缓冲到 5 秒的缓冲区大小。我们只取第一个缓冲区。一旦我们得到这个缓冲区,我们就会得到最后一个值并推送它。如果缓冲区为空,我假设我们推入一个,因为最后一个值是启动缓冲区运行的“1”。

sharedSource.Where(state=>state==1)
            .Take(1)
            .SelectMany(_=>sharedSource.Buffer(TimeSpan.FromSeconds(5)).Take(1))
            .Select(buffer=>
            {   
                if(buffer.Any())
                {
                    return buffer.Last();
                }
                else{
                    return 1;
                }
            })
            .Dump();

2) 在这个解决方案中,我采取的方法是,只有在我们获得有效值 (1) 后才开始侦听,然后获取所有值,直到计时器触发终止。从这里我们取最后一个产生的值。

var fromFirstValid = sharedSource.SkipWhile(state=>state==0);
fromFirstValid 
    .TakeUntil(
        fromFirstValid.Take(1)
                    .SelectMany(_=>Observable.Timer(TimeSpan.FromSeconds(5))))
    .TakeLast(1)
    .Dump();

3)在这个解决方案中,我使用窗口运算符创建一个窗口,当第一个值 '1' 发生时打开,然后在 5 秒后关闭。同样,我们只取最后一个值

sharedSource.Window(
                sharedSource.Where(state=>state==1),
                _=>Observable.Timer(TimeSpan.FromSeconds(5)))
            .SelectMany(window=>window.TakeLast(1))
            .Take(1)
            .Dump();

给猫剥皮的方法有很多。

于 2013-03-22T17:47:46.797 回答
1

听起来(一目了然)像您想要的那样Throttle,而不是Buffer,尽管有关您的用例的更多信息将有助于确定这一点 - 无论如何,这就是您Throttle的流式传输方式:

void Main()
{
    var subject = new Subject<int>();
    var source = subject.Publish().RefCount();

    var query = source
        // Start counting on a 1, wait 5 seconds, and take the last value
        .Throttle(x => Observable.Timer(TimeSpan.FromSeconds(5)));

    using(query.Subscribe(Console.WriteLine))
    {
        // This sequence should produce a one
        subject.OnNext(1);
        subject.OnNext(0);
        subject.OnNext(1);
        subject.OnNext(0);
        subject.OnNext(1);
        subject.OnNext(1);
        Console.ReadLine();
        // This sequence should produce a zero
        subject.OnNext(0);
        subject.OnNext(0);
        subject.OnNext(0);
        subject.OnNext(0);
        subject.OnNext(1);
        subject.OnNext(0);
        Console.ReadLine();
    }
}
于 2013-03-21T00:27:24.003 回答