你需要有很多计时器吗?我假设如果你有 20 个东西的集合,那么我们将创建 20 个计时器全部在同一时间点触发?在同一个线程/调度程序上?
或者,也许您想DoWork
在每个时期都对事物进行分析?
IE
from thing in things
from x in Observable.Interval(thing.Interval)
select DoWork(thing.Uri)
对比
Observable.Interval(interval)
.Select(_=>
{
foreach(var thing in Things)
{
DoWork(thing);
}
})
您可以通过多种方式在未来开展工作。
- 您可以直接使用调度程序来安排将来要完成的工作。
- 您可以使用 Observable.Timer 让序列在未来的指定时间产生一个值。
- 您可以使用 Observable.Interval 来生成一个序列,该序列在每个指定的时间段内产生许多值。
所以这现在引入了另一个问题。如果您的轮询时间为 60 秒,而您的工作功能需要 5 秒;下一次投票应该在 55 秒内还是 60 秒内进行?这里一个答案表明您想要使用 Rx 序列,另一个表明您可能想要使用 Periodic Scheudling。
下一个问题是,DoWork 是否返回值?目前看起来它没有*。在这种情况下,我认为最适合您的做法是利用定期调度程序(假设 Rx v2)。
var things = new []{
new Thing{Name="google", Uri = new Uri("http://google.com"), StartTime=DateTimeOffset.Now.AddSeconds(1), Interval=3},
new Thing{Name="bing", Uri = new Uri("http://bing.com"), StartTime=DateTimeOffset.Now.AddSeconds(1), Interval=3}
};
var scheduler = Scheduler.Default;
var scheduledWork = new CompositeDisposable();
foreach (var thing in things)
{
scheduledWork.Add( scheduler.SchedulePeriodic(thing, TimeSpan.FromSeconds(thing.Interval), t=>DoWork(t.Uri)));
}
//Just showing that I can cancel this i.e. clean up my resources.
scheduler.Schedule(TimeSpan.FromSeconds(10), ()=>scheduledWork.Dispose());
现在这将安排每件事情定期处理(没有漂移),在它自己的时间间隔内并提供取消。
如果您愿意,我们现在可以将其升级为查询
var scheduledWork = from thing in things
select scheduler.SchedulePeriodic(thing, TimeSpan.FromSeconds(thing.Interval), t=>DoWork(t.Uri));
var work = new CompositeDisposable(scheduledWork);
这些查询的问题是我们不满足StartTime
要求。令人讨厌的是,该Ccheduler.SchedulePeriodic
方法不提供重载以具有起始偏移量。
然而,Observable.Timer
运营商确实提供了这一点。它还将在内部利用非漂移调度功能。要重建查询,Observable.Timer
我们可以执行以下操作。
var urisToPoll = from thing in things.ToObservable()
from _ in Observable.Timer(thing.StartTime, TimeSpan.FromSeconds(thing.Interval))
select thing;
var subscription = urisToPoll.Subscribe(t=>DoWork(t.Uri));
所以现在你有一个很好的界面,应该避免漂移。但是,我认为这里的工作是以串行方式完成的(如果同时调用许多 DoWork 动作)。
*理想情况下,我会尽量避免这样的副作用陈述,但我不能 100% 确定您的要求。
编辑
看来,对 DoWork 的调用必须是并行的,所以你需要做更多的事情。理想情况下,您将 DoWork 制作为 asnyc,但如果您做不到,我们可以在制作之前伪造它。
var polling = from thing in things.ToObservable()
from _ in Observable.Timer(thing.StartTime, TimeSpan.FromSeconds(thing.Interval))
from result in Observable.Start(()=>DoWork(thing.Uri))
select result;
var subscription = polling.Subscribe(); //Ignore the bool results?