我再次与 Rx 作斗争。这次我遇到了 Observable.Interval 的问题。
我的要求是:
- 我需要每 1 秒运行一次数据收集。
- 我需要每 5 秒检查一次参数的更改。
- 在检查更改时,我无法收集数据。
- 如果数据收集或检查更改的时间超过 1 秒,请不要将这些记号排队,只需跳过它们。
- 如果在数据收集过程中发生更改检查,我希望它等待执行。
我已经尝试使用 Observables 作为间隔,并发现默认情况下,Intervals 将排队错过的滴答声!最后,在创建了一个完整的控制台应用程序之后,我找到了一个演示我需要的示例。这个实现似乎只适用于 Scheduler.NewThread。我的新问题是我根本无法测试这个实现,因为测试调度程序似乎是 CurrentThread。
我的控制台应用示例代码:
var otherThreadScheduler = Scheduler.NewThread;
cancel = otherThreadScheduler.Schedule(
TimeSpan.FromSeconds(1),
recursive =>
{
lock (obj)
{
Console.WriteLine(
"Processing Data - Thread ID = " + Thread.CurrentThread.ManagedThreadId);
var t = new Task(
() =>
{
Console.WriteLine(
"Hi I'm the task on thread {0}",
Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(2000);
});
t.Start();
Console.WriteLine(
"Processing Data Waiting for it to finish - Thread ID = "
+ Thread.CurrentThread.ManagedThreadId);
t.Wait();
}
Console.WriteLine("Processing Data finished - Thread ID = " + Thread.CurrentThread.ManagedThreadId);
recursive(TimeSpan.FromSeconds(1));
});
cancel2 = otherThreadScheduler.Schedule(
TimeSpan.FromSeconds(1),
recursive =>
{
lock (obj)
{
Console.WriteLine("Processing Detection - Thread ID = " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(10000);
}
recursive(TimeSpan.FromSeconds(5));
});
这不可测试的原因是我的真实代码中的任务是可模拟的,所以我模拟它循环,直到我发出信号停止但由于我的代码执行 task.Wait(),我当前的线程阻塞,所以我永远无法向任务发出信号返回。所有这一切的重点是模拟长时间运行的数据收集并验证更改检测不会触发。
所以我的问题是:有没有更优雅的解决方案来满足我的需求?