我有大量简单的配对类:
public class Pair { public DateTime Timestamp; public double Value; }
它们按升序时间戳排序。我想在适当的时候为列表中的每个项目触发一个带有值(比如 Action<double>)的事件。时间是过去的,所以我需要标准化时间戳,使列表中的第一个是“现在”。我们可以使用 Reactive Extensions 进行设置,以便在两个项目之间的时间差之后触发下一个事件吗?
我有大量简单的配对类:
public class Pair { public DateTime Timestamp; public double Value; }
它们按升序时间戳排序。我想在适当的时候为列表中的每个项目触发一个带有值(比如 Action<double>)的事件。时间是过去的,所以我需要标准化时间戳,使列表中的第一个是“现在”。我们可以使用 Reactive Extensions 进行设置,以便在两个项目之间的时间差之后触发下一个事件吗?
说pairs
是你的序列:
var obs = pairs.OrderBy(p => p.Timestamp).ToObservable();
现在obs
是对作为有序可观察对象。
Observable.Zip(
obs,
obs.Take(1).Concat(obs),
(pair1, pair2) => Observable.Timer(pair1.Timestamp - pair2.Timestamp)
.Select(_ => pair1.Value))
.Concat()
.Subscribe(/* Do something here */);
zip 负责将绝对时间转换为偏移量。它将获取序列并将其与自身连接,但偏移一,如下所示
Original 1--2--4--7--11
Offset 1--1--2--4--7--11
Joined 0--1--2--3--4
然后将这个新值放入以将Observable.Timer
其延迟适当的量。最后Concat
将结果从 a 展平IObservable<IObservable<double>>
为 a IObservable<double>
。这假设您的序列是有序的。
如果通过“使用 Rx”允许我只使用 Rx 调度程序,那么这是一个非常简单的解决方案:
Action<double> action =
x =>
Console.WriteLine(x);
var ts0 = pairs.Select(p => p.Timestamp).Min();
pairs
.ForEach(p =>
Scheduler
.ThreadPool
.Schedule(
p.Timestamp.Subtract(ts0),
() => action(p.Value)));
这使用System.Interactive
extension ForEach
,但您可以只使用常规foreach
循环来加载调度程序。
我已经使用以下虚拟数据测试了代码:
var pairs = new []
{
new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 30), Value = 1.1, },
new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 45), Value = 1.2, },
new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 40), Value = 1.3, },
};
我希望这有帮助。
我认为这个问题很有趣,这将是我第一次尝试。
static void RunPairs(IEnumerable<Pair> pairs, Action<double> pairEvent)
{
if (pairs == null || !pairs.Any() || pairEvent == null)
return;
// if we can promise the pairs are already sorted
// obviously we don't need this next line
pairs = pairs.OrderBy(p => p.Timestamp);
var first = pairs .First().Timestamp;
var wrapped = pairs.Select(p => new { Offset = (p.Timestamp - first), Pair = p });
var start = DateTime.Now;
double interval = 250; // 1/4 second
Timer timer = new Timer(interval);
timer.AutoReset = true;
timer.Elapsed += (sender, elapsedArgs) =>
{
var signalTime = elapsedArgs.SignalTime;
var elapsedTime = (signalTime - start);
var pairsToTrigger = wrapped.TakeWhile(wrap => elapsedTime > wrap.Offset).Select(w => w.Pair);
wrapped = wrapped.Skip(pairsToTrigger.Count());
if (!wrapped.Any())
timer.Stop();
foreach (var pair in pairsToTrigger)
pairEvent(pair.Value);
};
timer.Start();
}