1

我有一个非常简单的类,我用它来轮询目录中的新文件。它有位置,开始监视该位置的时间,以及何时再次检查的时间间隔(以小时为单位):

public class Thing
{
  public string         Name {get; set;}
  public Uri            Uri { get; set;}
  public DateTimeOffset StartTime {get; set;}
  public double         Interval {get; set;}
}

我是 Reactive Extensions 的新手,但我认为它正是在这里工作的正确工具。在开始时,以及随后的每个时间间隔,我只想调用一个 Web 服务来完成所有繁重的工作——我们将使用永远有创意public bool DoWork(Uri uri)的来表示它。

编辑: DoWork 是对 Web 服务的调用,它将检查新文件并在必要时移动它们,因此它的执行应该是异步的。如果完成则返回 true,否则返回 false。

如果我有这些Things 的完整集合,事情就会变得复杂。我不知道如何Observable.Timer()为每个人创建一个,并让他们都调用相同的方法。

edit2: Observable.Timer(DateTimeOffset, Timespan) 似乎非常适合为我在这里尝试做的事情创建一个 IObservable。想法?

4

3 回答 3

3

你需要有很多计时器吗?我假设如果你有 20 个东西的集合,那么我们将创建 20 个计时器全部在同一时间点触发?在同一个线程/调度程序上?

或者,也许您想DoWork在每个时期都对事物进行分析?

IE

from thing in things
from x in Observable.Interval(thing.Interval)
select DoWork(thing.Uri)

对比

Observable.Interval(interval)
.Select(_=>
    {
        foreach(var thing in Things)
        {
            DoWork(thing);
        }
    })

您可以通过多种方式在未来开展工作。

  • 您可以直接使用调度程序来安排将来要完成的工作。
  • 您可以使用 Observable.Timer 让序列在未来的指定时间产生一个值。
  • 您可以使用 Observable.Interval 来生成一个序列,该序列在每个指定的时间段内产生许多值。

所以这现在引入了另一个问题。如果您的轮询时间为 60 秒,而您的工作功能需要 5 秒;下一次投票应该在 55 秒内还是 60 秒内进行?这里一个答案表明您想要使用 Rx 序列,另一个表明您可能想要使用 Periodic Sc​​heudling。

下一个问题是,DoWork 是否返回值?目前看起来它没有*。在这种情况下,我认为最适合您的做法是利用定期调度程序(假设 Rx v2)。

var things = new []{
    new Thing{Name="google", Uri = new Uri("http://google.com"), StartTime=DateTimeOffset.Now.AddSeconds(1), Interval=3},
    new Thing{Name="bing", Uri = new Uri("http://bing.com"), StartTime=DateTimeOffset.Now.AddSeconds(1), Interval=3}
};
var scheduler = Scheduler.Default;
var scheduledWork = new CompositeDisposable();

foreach (var thing in things)
{
    scheduledWork.Add( scheduler.SchedulePeriodic(thing, TimeSpan.FromSeconds(thing.Interval), t=>DoWork(t.Uri)));
}

//Just showing that I can cancel this i.e. clean up my resources.
scheduler.Schedule(TimeSpan.FromSeconds(10), ()=>scheduledWork.Dispose());

现在这将安排每件事情定期处理(没有漂移),在它自己的时间间隔内并提供取消。

如果您愿意,我们现在可以将其升级为查询

var scheduledWork = from thing in things
                    select scheduler.SchedulePeriodic(thing, TimeSpan.FromSeconds(thing.Interval), t=>DoWork(t.Uri));

var work = new CompositeDisposable(scheduledWork);

这些查询的问题是我们不满足StartTime要求。令人讨厌的是,该Ccheduler.SchedulePeriodic方法不提供重载以具有起始偏移量。

然而,Observable.Timer运营商确实提供了这一点。它还将在内部利用非漂移调度功能。要重建查询,Observable.Timer我们可以执行以下操作。

var urisToPoll = from thing in things.ToObservable()
                 from _ in Observable.Timer(thing.StartTime, TimeSpan.FromSeconds(thing.Interval))
                 select thing;

var subscription = urisToPoll.Subscribe(t=>DoWork(t.Uri));

所以现在你有一个很好的界面,应该避免漂移。但是,我认为这里的工作是以串行方式完成的(如果同时调用许多 DoWork 动作)。

*理想情况下,我会尽量避免这样的副作用陈述,但我不能 100% 确定您的要求。

编辑 看来,对 DoWork 的调用必须是并行的,所以你需要做更多的事情。理想情况下,您将 DoWork 制作为 asnyc,但如果您做不到,我们可以在制作之前伪造它。

var polling = from thing in things.ToObservable()
              from _ in Observable.Timer(thing.StartTime, TimeSpan.FromSeconds(thing.Interval))
              from result in Observable.Start(()=>DoWork(thing.Uri))
              select result;

var subscription = polling.Subscribe(); //Ignore the bool results?
于 2013-08-08T14:41:05.680 回答
2

嗯。是否会DoWork产生您需要做某事的结果?我会假设是这样。你没有说,但我也会假设DoWork是同步的。

things.ToObservable()
  .SelectMany(thing => Observable
    .Timer(thing.StartTime, TimeSpan.FromHours(thing.Interval))
    .Select(_ => new { thing, result = DoWork(thing.Uri) }))
  .Subscribe(x => Console.WriteLine("thing {0} produced result {1}",
                                    x.thing.Name, x.result));

这是一个带有假设的版本Task<bool> DoWorkAsync(Uri)

things.ToObservable()
  .SelectMany(thing => Observable
    .Timer(thing.StartTime, TimeSpan.FromHours(thing.Interval))
    .SelectMany(Observable.FromAsync(async () =>
       new { thing, result = await DoWorkAsync(thing.Uri) })))
  .Subscribe(x => Console.WriteLine("thing {0} produced result {1}",
                                    x.thing.Name, x.result));

此版本假定DoWorkAsync将在间隔到期之前很久完成并启动一个新实例,因此不会防范DoWorkAsync同一Thing实例的并发运行。

于 2013-08-08T03:25:00.303 回答
1

这是我能想到的最清晰的方法:

foreach (var thing in things)
{
    Scheduler.ThreadPool.Schedule(
        thing.StartTime,
        () => {
                DoWork(thing.Uri); 
                Observable.Interval(TimeSpan.FromHours(thing.Interval))
                          .Subscribe(_ => DoWork(thing.Uri)));
              }
}

下面一个更实用:

things.ToObservable()
    .SelectMany(t => Observable.Timer(t.StartTime).Select(_ => t))
    .SelectMany(t => Observable.Return(t).Concat(
        Observable.Interval(TimeSpan.FromHours(t.Interval)).Select(_ => t)))
    .Subscribe(t => DoWork(t.Uri));

First SelectMany 在其预定时间创建一个事物流。第二个 SelectMany 采用此流并创建一个在每个间隔重复的新流。这需要连接到Observable.Return立即产生一个值,因为Observable.Interval' 的第一个值被延迟。

*注意,第一个解决方案需要 c#5 否则咬你。

于 2013-08-08T02:12:59.917 回答