1)我想你想看看Observable.Interval
。Observable.Timer
是一个单值序列,相反Observable.Interval
将在给定的时间间隔内继续产生值。如果对于每个项目/网址,此轮询周期是恒定的,那么这可能就是您想要的。
2)您可以将IEnumerable
(URL)和IObservable
(轮询事件)序列混合在一起。
3) 一个错误将终止一个序列。您可以通过将序列中的值投影 ( Select
) 到 WebRequest 的输出来利用这一点。如果 WebRequest 失败,序列将OnError
作为 OnError 有效负载传播异常。
然后慢慢地通过这个;
- 我们想要获取 URL/Period 对的列表,
- 在给定时间轮询 URL,
- 如果对该 URL 的任何调用失败,则停止轮询该 URL(并可能记录它)
- 如果 Windows 服务停止则停止轮询
让我们首先处理一个要轮询的 URL
protected override void OnStart(string [] args)
{
var resource = new {Url="http://www.bing.com", Period=TimeSpan.FromMinutes(3)};
var pollingSequence = Observable.Interval(resource.Period)
.Select(_=>Request(resource.Url));
var subscription = pollingSequence.Subscribe(
response=>Log(response),
ex=>Log(ex)
));
_subscriptions = new CompositeDisposable(subscription);
}
protected override void OnStop()
{
Dispose();
}
private bool Request(string url)
{
//TODO:
}
public void Dispose()
{
_subscriptions.Dispose();
}
为了可视化这一点,我们可以使用“大理石图”。这里每个字符空间代表 1 分钟。OnNext 表示为“o”(大理石)。
bing --o--o--o--o- (every 3rd minute)
更准确地说,我们实际上是通过事件获取值(即使我们忽略了值)
bing --0--1--2--3-
然后我们获取每个事件并将其投影到请求中,因此序列现在转到此(其中 'r' 表示来自请求的响应)
bing --r--r--r--r-
如果任何请求失败,则序列将终止,例如这里 3 请求失败,我们显示带有“X”的 OnError
bing --r--r--X
现在我们可以将示例扩展到资源列表,即IEnumerable<IObservable<T>>
protected override void OnStart(string [] args)
{
var resources = new[] {
new {Url="http://www.bing.com", Period=TimeSpan.FromMinutes(3)},
new {Url="http://www.google.com", Period=TimeSpan.FromMinutes(2)},
new {Url="http://www.yahoo.com", Period=TimeSpan.FromMinutes(5)},
new {Url="http://www.stackoverflow.com", Period=TimeSpan.FromMinutes(2)}
};
//The same as before, but now we create a polling sequence per resource.
var pollingSequences = from resource in resources
select Observable.Interval(resource.Period)
.Select(_=>Request(resource.Url));
//Now we cant subscribe to an `IEnumerable<IObservable<T>>` *,
// but we can subscribe to each sequence in it.
// This turns our queries (IE<IO<T>>) into subscriptions (IE<IDisposable>).
var subscriptions = pollingSequences.Select(x =>
x.Subscribe(
response=>Log(response),
ex=>Log(ex)
));
_subscriptions = new CompositeDisposable(subscriptions);
}
在这里,我们有一个服务将开始调用该Request
方法(供您实现),在为 Url 指定的时间段传递 Url。如果 Request 方法抛出,则 Url 将不再被轮询。如果您停止服务,则订阅将被处理,并且不会再发生轮询。
为了可视化这一点,我们可以使用“大理石图”。因此,我们从每个“资源”(空间中的数据)的“行”开始,即。var resources = new[] { ...};
bing
google
yahoo
SO
接下来我们认为每一行也有一个轮询事件序列(数据及时)。从上面的代码(使用 1 char = 1 分钟)
bing --1--2--3- (every 3rd minute)
google -1-2-3-4-5 (every 2nd minute)
yahoo ----1----2 (every 5 minutes)
SO -1-2-3-4-5 (every 2nd minute)
- 你不能订阅
IEnumerable<IObservable<T>>
,但是有像 Concat 和 Merge 这样的 Rx 方法来处理这种类型的数据。然而,我们不想使用这些,因为它们将序列扁平化为单个序列,这意味着如果任何失败,所有轮询都将停止。