0

尽管社区在这里这里这里这里提供了耐心的帮助——我担心我离真正的理解还差得远。我试图将IntroToRx中的概念牢牢地植入我的大脑,但套用亚瑟王的话:“这种新的学习使我感到困惑。再次解释一下羊的膀胱和地震。” 在我看来,这就是我们可能正在谈论的内容。

我会把它简化为需求(已经改变 - .NET 4.0 是最重要的),而不是用我自己的(可能有缺陷的)实现走错路:

  1. 我需要使用 .NET Framework 4.0 来编写 Windows 服务。
  2. 我需要我的 Windows 服务来调用 Web 服务 (WS),并将多个 URL 之一传递给它,但每次调用只传递一个 URL。
  3. 我需要能够为每个 URL(1 到n)定义单独的间隔来调用 WS。例如:对于 URL1,我可能每小时调用一次 WS,对于 URL2,每四个小时调用一次,等等。
  4. 如果 Web 服务返回该 URL 的错误,我需要能够停止为特定 URL 调用 Web 服务。

就是这样了。据我所知,这是一个完美的 Rx 场景。我很确定我想Observable.Timer为序列创建 s,但是如何在错误时禁用计时器?这可能是一个CancellationToken等待发生的事情(或者我只是完成计时器),但我似乎无法接受这些示例并将它们组合成一个可靠的、可理解的解决方案。

Whaddaya 说,SO:有人想拉着这个老人的手,用小话解释做这种废话的正确方法吗?如果我没有很好地解释它,或者我遗漏了一些东西,我会很乐意编辑。

谢谢。

4

1 回答 1

2

1)我想你想看看Observable.IntervalObservable.Timer是一个单值序列,相反Observable.Interval将在给定的时间间隔内继续产生值。如果对于每个项目/网址,此轮询周期是恒定的,那么这可能就是您想要的。

2)您可以将IEnumerable(URL)和IObservable(轮询事件)序列混合在一起。

3) 一个错误将终止一个序列。您可以通过将序列中的值投影 ( Select) 到 WebRequest 的输出来利用这一点。如果 WebRequest 失败,序列将OnError作为 OnError 有效负载传播异常。

然后慢慢地通过这个;

  • 我们想要获取 URL/Period 对的列表,
  • 在给定时间轮询 URL,
  • 如果对该 URL 的任何调用失败,则停止轮询该 URL(并可能记录它)
  • 如果 Windows 服务停止则停止轮询

让我们首先处理一个要轮询的 URL

protected override void OnStart(string [] args)
{
    var resource = new {Url="http://www.bing.com", Period=TimeSpan.FromMinutes(3)};

    var pollingSequence =  Observable.Interval(resource.Period)
                                     .Select(_=>Request(resource.Url));
    var subscription = pollingSequence.Subscribe(
            response=>Log(response),
            ex=>Log(ex)
        ));
    _subscriptions = new CompositeDisposable(subscription);
}

protected override void OnStop()
{
    Dispose();
}

private bool Request(string url)
{
    //TODO:
}

public void Dispose()
{
    _subscriptions.Dispose();
}

为了可视化这一点,我们可以使用“大理石图”。这里每个字符空间代表 1 分钟。OnNext 表示为“o”(大理石)。

    bing    --o--o--o--o-       (every 3rd minute)

更准确地说,我们实际上是通过事件获取值(即使我们忽略了值)

    bing    --0--1--2--3-       

然后我们获取每个事件并将其投影到请求中,因此序列现在转到此(其中 'r' 表示来自请求的响应)

    bing    --r--r--r--r-       

如果任何请求失败,则序列将终止,例如这里 3 请求失败,我们显示带有“X”的 OnError

    bing    --r--r--X

现在我们可以将示例扩展到资源列表,即IEnumerable<IObservable<T>>

protected override void OnStart(string [] args)
{
    var resources = new[] { 
        new {Url="http://www.bing.com", Period=TimeSpan.FromMinutes(3)},
        new {Url="http://www.google.com", Period=TimeSpan.FromMinutes(2)},
        new {Url="http://www.yahoo.com", Period=TimeSpan.FromMinutes(5)},
        new {Url="http://www.stackoverflow.com", Period=TimeSpan.FromMinutes(2)}
    };

    //The same as before, but now we create a polling sequence per resource.
    var pollingSequences = from resource in resources
                        select Observable.Interval(resource.Period)
                                            .Select(_=>Request(resource.Url));

    //Now we cant subscribe to an `IEnumerable<IObservable<T>>` *, 
    //  but we can subscribe to each sequence in it.
    // This turns our queries (IE<IO<T>>) into subscriptions (IE<IDisposable>).
    var subscriptions = pollingSequences.Select(x => 
        x.Subscribe(
            response=>Log(response),
            ex=>Log(ex)
        ));
    _subscriptions = new CompositeDisposable(subscriptions);
}

在这里,我们有一个服务将开始调用该Request方法(供您实现),在为 Url 指定的时间段传递 Url。如果 Request 方法抛出,则 Url 将不再被轮询。如果您停止服务,则订阅将被处理,并且不会再发生轮询。

为了可视化这一点,我们可以使用“大理石图”。因此,我们从每个“资源”(空间中的数据)的“行”开始,即。var resources = new[] { ...};

    bing
    google
    yahoo
    SO

接下来我们认为每一行也有一个轮询事件序列(数据及时)。从上面的代码(使用 1 char = 1 分钟)

    bing    --1--2--3-      (every 3rd minute)
    google  -1-2-3-4-5      (every 2nd minute)
    yahoo   ----1----2      (every 5 minutes)
    SO      -1-2-3-4-5      (every 2nd minute)
  • 你不能订阅IEnumerable<IObservable<T>>,但是有像 Concat 和 Merge 这样的 Rx 方法来处理这种类型的数据。然而,我们不想使用这些,因为它们将序列扁平化为单个序列,这意味着如果任何失败,所有轮询都将停止。
于 2013-09-12T12:56:12.417 回答