5

我正在尝试使用 Reactive Extensions 来获取一堆 RSS 项目。我基于 Tim Greenfield 的博客文章:Silverlight Rx DataClient inside MVVM

我在桌面应用程序中使用它,但代码是相似的。

我遇到的问题是理解Retry()功能。它似乎没有按照我的期望和我期望的那样做。

var items = new List<RssItem>();
WebHelper.DownloadXmlFileAsync<RssItem>(new Uri(URI), "item")
    .Retry(2)
    .Finally(PublishResults)
    .Subscribe(items.Add, ProcessError, () => ProcessCompleted(items));

当我传入一个有效的 URI 时,这可以正常工作。当我在 URI 中打错字时,它会通过函数报告 404 错误ProcessError(),正如人们所期望的那样,但它只报告了一次。我原以为它会显示此错误两次。

因此,该Retry()函数似乎没有在我的 Web 请求上运行,但看起来它实际上适用于传递给Subscribe(). 不过,我在这里可能是错的。

如何确保Retry()调用适用于 Web 请求?

额外代码:

public static class WebHelper
{
    public static HttpWebRequest CreateHttp(Uri uri)
    {
        return CreateHttp(uri, "GET");
    }

    public static HttpWebRequest CreateHttp(Uri uri, string method)
    {
        if (uri.Scheme != Uri.UriSchemeHttp && uri.Scheme != Uri.UriSchemeHttps)
        {
            throw new ArgumentException("The specified URI does not use HTTP or HTTPS.", "uri");
        }

        var request = (HttpWebRequest)WebRequest.Create(uri);
        request.Method = method;

        return request;
    }

    public static IObservable<T> DownloadXmlFileAsync<T>(Uri uri, string elementName) where T : class
    {
        return (from request in Observable.Return(CreateHttp(uri))
                from response in Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
                let stream = response.GetResponseStream()
                where stream != null
                from item in XmlReader.Create(stream).GetXmlItem<T>(elementName).ToObservable()
                select item);
    }
}

public static class XmlExtensions
{
    public static IEnumerable<T> GetXmlItem<T>(this XmlReader reader, string elementName) where T : class
    {
        var serializer = new XmlSerializer(typeof (T));
        while (reader.GoToElement(elementName))
        {
            yield return serializer.Deserialize(reader) as T;
        }
    }

    public static bool GoToElement(this XmlReader reader, string elementName)
    {
        do
        {
            if (reader.NodeType == XmlNodeType.Element && reader.Name == elementName)
            {
                return true;
            }
        } while (reader.Read());

        return false;
    }
}

XmlRoot("item")]
public class RssItem
{
    [XmlElement("description")]
    public string Description { get; set; }

    [XmlElement("link")]
    public string Link { get; set; }

    [XmlElement("pubDate")]
    public string PublishDate { get; set; }

    [XmlElement("title")]
    public string Title { get; set; }

    public override string ToString()
    {
        return string.Format("Title: {0}", Title);
    }
}
4

2 回答 2

16

序列的 Rx 文法定义为:

OnNext* (OnError | OnCompleted)?

接收到一个OnError或一个OnCompleted信号表示序列的结束,并且管道上的订阅预计将被拆除。

在运营商的背景下:

observable.Retry(n)observableis:收到an后重新订阅OnError,最多n次。

observable.Finally(action)是:action接收时执行OnError|OnCompleted

重试旨在与冷可观察对象一起使用(Lee Campbell对此有一篇很好的文章),订阅本质上会导致源启动。

类似Repeat地,Retry除了它在接收到OnCompleted.

为了看到这一点,我们可以创建一个 observable,它会在前 n 次“失败”,然后成功。现在看一些代码:

    private static IObservable<int> ErrorProducer(int i)
    {
        int count = 0;
        return Observable.Create<int>(observer =>
        {
            Console.WriteLine("Doing work");

            if (count++ < i)
            {
                Console.WriteLine("Failed");
                observer.OnError(new Exception());
            }
            else
            {
                Console.WriteLine("Done");
                observer.OnNext(count);
                observer.OnCompleted();                    
            }
            return Disposable.Empty;
        });
    }

对于总是失败的生产者:

      print(ErrorProducer(3).Retry(2));

给出:

Doing work <-- Subscription
Failed
Doing work <-- Resubscription
Failed
OnError(System.Exception)
Finally

对于最终成功的生产者:

print(ErrorProducer(2).Retry(3));

Doing work
Failed
Doing work
Failed
Doing work
Done
OnNext(3) <-- Succeeded
OnCompleted()
Finally

如果您希望进程错误函数在重试时被调用多次,则应将其放在Retry.

IE,seq.Do(value => { }, exception => { }).Retry(n)

您可以阅读有关使用热/冷 observables 以及使用带有 Rx 的异步模式来阐明您的理解的内容。

于 2012-10-18T16:35:46.853 回答
4

Asti 的回答很到位。我只是想添加一些额外的信息,以防您想知道如何为单个逻辑序列公开多个错误。

正如 Asti 指出的那样,您只能终止一次序列。此终止可以是错误或完成 (OnError|OnCompleted)。

然而,没有什么能阻止你拥有嵌套的可观察序列!如果您确实希望看到多条错误消息,请考虑返回IObservable<IObservable<T>>. 内部序列是数据序列(您当前拥有的序列)。当这个序列出错时,就不能再使用了,所以外序列可以产生一个新的内数据序列。

这可能看起来有点奇怪,但它是 Rx 中支持的概念,因为 Merge 和 Switch 等运算符已经满足了这些嵌套序列。这种 Rx 风格在我的书 IntroToRx 中的嵌套序列段落中有所涉及,然后在序列的巧合章节中再次更详细地介绍

我希望这可以帮助您了解将来如何进行 Rx 的其他可能性。

于 2012-10-22T09:00:06.607 回答