4

我正在使用绑定到 COM 端口的 Reactive Extensions Observable 数据流,并且我正在显示来自该数据流的缓冲区,该数据流在一个时间间隔内获取。

这是我的基本 Rx 代码,其中以 25 毫秒的块返回字节数据。我想在第一次达到特定阈值时触发缓冲区的生成,然后仅在收集前一个缓冲区后再次执行此操作。

var o = serialData.Buffer(TimeSpan.FromMilliseconds(25))
                  .ObserveOn(SynchronizationContext.Current);
var mySerialObserver = o.Subscribe<IList<byte>>(SubscribeAction());

serialData 对象是来自 USB COM 端口的连续字节值流的 IObservable。代码改编自 Bart De Smet 帖子:

如何使用 Rx 实现 SerialPort 解析器

使用 Rx Buffer(TimeSpan) 方法,我可以对 serialData 进行采样并在图形上显示缓冲区值(在我的 SubscribeAction 方法中使用DynamicDataDisplay )。

我想将功能扩展为类似于示波器触发器,这可能涉及在 serialData 值超过给定阈值时调用 Rx Buffer 方法,但不收集重叠缓冲区(这类似于示波器时基触发在一定的输入电压下,但在扫描完成之前不会再次触发)

请有人能给我一些关于如何实施的想法吗?

4

1 回答 1

3

缓冲区只会在缓冲区关闭之前释放所有值,这对于实时图表不是很有用。您必须将值拆分为不重叠的窗口 - 从给定的触发器开始,并在扫描条件完成时关闭 - 一个完整的扫描周期的窗口。不幸的是,窗口在启动时仍会为我们提供值,因此我们将不得不跳过触发器触发之前进入的所有值。

    static IObservable<IObservable<T>> TriggeredSweep<T>(
        this IObservable<T> source,
        Func<T, bool> triggerCondition,
        Func<T, bool> sweepEnd
        )
    {
        source = source.Publish().RefCount();
        return source.Window(() => source.Where(triggerCondition).Sample(source.Where(sweepEnd)))
                     .Select(s => s.SkipWhile(v => !triggerCondition(v)));

    }

测试这一点的最佳方法是使用它所依据的示波器模型:

        double period = 1000 / 0.5; //0.5 Hz
        int cycles = 4;             //cycles to display
        int quantization = 100;     //cycles to display            
        int amplitude = 10;         //signal peak            

        int range = quantization * cycles;    //full range 

        //Sine wave generator for n cycles
        //makes tuple of (t, sin(t))
        var source = Observable.Interval(TimeSpan.FromMilliseconds(period / range))
                               .Select(s => s % (range + 1))
                               .Select(s => Tuple.Create(s, amplitude * Math.Sin((double)s / ((double)range / (double)cycles) * 2 * Math.PI)));


        source.TriggeredSweep(
            value => value.Item2 > 5, //Trigger when Signal value > 5
            value => value.Item1 / quantization >= cycles //end sweep when all cycles are done
            )
              .Subscribe(window =>
              {
                  Console.Clear(); //Clear CRO Monitor

                  window.Subscribe(value =>
                  {
                      //Set (x, y)
                      Console.CursorLeft = (int)((double)value.Item1 / range * (Console.WindowWidth - 1));
                      Console.CursorTop = (int)((amplitude - value.Item2) / (2 * amplitude) * (Console.WindowHeight - 1));

                      //draw
                      Console.Write("x");
                  });
              });

        //prevent close
        Console.ReadLine();

输出:

    xxx                   xxxx                   xxx                   xxxx
   xx  x                  x  x                  xx  x                  x  x
   x   x                 xx   x                 x   x                 xx   x
  xx    x                x    x                xx    x                x    x
  x     x               xx     x               x     x               xx     x
  x      x              x      x               x      x              x      x
  x      x              x      x              xx      x              x      x
         x              x       x             x       x              x       x
         x             x        x             x       x             x        x
          x            x        x             x        x            x        x
          x            x        xx           x         x            x        xx
          x           x          x           x         x           x          x
           x          x          x           x          x          x          x
           x          x          x           x          x          x          x           x
           x          x          x          x           x          x          x          x
           x          x           x         x           x          x           x         x
           xx        x            x         x           xx        x            x         x
            x        x            x        x             x        x            x        x
            x        x             x       x             x        x             x       x
            x       x              x       x             x       x              x       x
             x      x              x      xx              x      x              x      xx
             x      x               x     x               x      x               x     x
             x     xx               x     x               x     xx               x     x
              x    x                x    xx                x    x                x    xx
              x   xx                 x   x                 x   xx                 x   x
               x  x                  x  xx                  x  x                  x  xx
               xxxx                   xxx                   xxxx                   xxx
                x                      x                     x                      x

我希望这段代码可能对使用 Rx 测试简单的信号处理功能有用。:)

于 2012-09-28T22:53:23.177 回答