5

我有大量简单的配对类:

public class Pair { public DateTime Timestamp; public double Value; }

它们按升序时间戳排序。我想在适当的时候为列表中的每个项目触发一个带有值(比如 Action<double>)的事件。时间是过去的,所以我需要标准化时间戳,使列表中的第一个是“现在”。我们可以使用 Reactive Extensions 进行设置,以便在两个项目之间的时间差之后触发下一个事件吗?

4

3 回答 3

7

pairs是你的序列:

var obs = pairs.OrderBy(p => p.Timestamp).ToObservable();

现在obs是对作为有序可观察对象。

Observable.Zip(
    obs,
    obs.Take(1).Concat(obs),
    (pair1, pair2) => Observable.Timer(pair1.Timestamp - pair2.Timestamp)
      .Select(_ => pair1.Value))
.Concat()
.Subscribe(/* Do something here */);

zip 负责将绝对时间转换为偏移量。它将获取序列并将其与自身连接,但偏移一,如下所示

Original 1--2--4--7--11
Offset   1--1--2--4--7--11
Joined   0--1--2--3--4

然后将这个新值放入以将Observable.Timer其延迟适当的量。最后Concat将结果从 a 展平IObservable<IObservable<double>>为 a IObservable<double>。这假设您的序列是有序的。

于 2012-05-03T23:49:21.187 回答
2

如果通过“使用 Rx”允许我只使用 Rx 调度程序,那么这是一个非常简单的解决方案:

Action<double> action =
    x =>
        Console.WriteLine(x);

var ts0 = pairs.Select(p => p.Timestamp).Min();

pairs
    .ForEach(p => 
        Scheduler
            .ThreadPool
            .Schedule(
                p.Timestamp.Subtract(ts0),
                () => action(p.Value)));

这使用System.Interactiveextension ForEach,但您可以只使用常规foreach循环来加载调度程序。

我已经使用以下虚拟数据测试了代码:

var pairs = new []
{
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 30), Value = 1.1, },
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 45), Value = 1.2, },
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 40), Value = 1.3, },
};

我希望这有帮助。

于 2012-05-04T00:57:38.007 回答
0

我认为这个问题很有趣,这将是我第一次尝试。

static void RunPairs(IEnumerable<Pair> pairs, Action<double> pairEvent)
{
  if (pairs == null || !pairs.Any() || pairEvent == null)
    return;

  // if we can promise the pairs are already sorted
  // obviously we don't need this next line
  pairs = pairs.OrderBy(p => p.Timestamp);
  var first = pairs .First().Timestamp;
  var wrapped = pairs.Select(p => new { Offset = (p.Timestamp - first), Pair = p });

  var start = DateTime.Now;

  double interval = 250; // 1/4 second
  Timer timer = new Timer(interval);

  timer.AutoReset = true;
  timer.Elapsed += (sender, elapsedArgs) =>
  {
    var signalTime = elapsedArgs.SignalTime;
    var elapsedTime = (signalTime - start);

    var pairsToTrigger = wrapped.TakeWhile(wrap => elapsedTime > wrap.Offset).Select(w => w.Pair);
    wrapped = wrapped.Skip(pairsToTrigger.Count());

    if (!wrapped.Any())
      timer.Stop();

    foreach (var pair in pairsToTrigger)
      pairEvent(pair.Value);    
  };

  timer.Start();
}
于 2012-05-03T23:55:52.090 回答