0

我正在使用 Reactive Extensions 创建匿名 Observable 来执行 Web 请求,对其进行后处理并返回一些值。这种技术帮助我将价值获取的机制与使用这些值的其他类区分开来。Web 请求经常执行,因此有一个计时器可以使用这种技术构造和调用请求。我构建 Observable 如下:

Observable.FromAsyncPattern<WebResponse>(getRequest.BeginGetResponse, getRequest.EndGetResponse)

实际上,端点并不总是可用的。在 web 请求超时的情况下,Subscribe()方法中指定的回调永远不会被调用。我正在使用Timeout()Rx 扩展来处理这种情况。我也在做请求结果的后处理。这是我为解决该问题而实施的一段仿真代码:

var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint");
var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();
var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(...)

然后我实现了处理方法的返回值Subscribe()以从 Observable 取消订阅回调委托。实际上我有一个三元组的集合<IObservable obs, IDisposable disp, bool RequestComplete>Subscribe()方法回调设置RequestComplete = true,并且随着时间的推移,如果不再需要所有一次性用品,它们就会被处置,即已经调用了回调。一切似乎都很好,但我注意到收藏在不断增长,我不知道为什么。

所以我实现了这种情况的仿真。此代码没有处理部分 - 我将其替换为简单的请求计数器:

public static class RequestCounter
{
    private static object syncRoot = new object();

    private static int count = 0;

    public static void Increment()
    {
        lock (syncRoot)
        {
            count++;
        }
    }

    public static void Decrement()
    {
        lock (syncRoot)
        {
            count--;
        }
    }

    public static int RequestCount
    {
        get { return count; }
    }
}

这是测试本身:

public void RunTest()
    {
        while (true)
        {
            var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint");
            var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();
            RequestCounter.Increment();
            var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(
                res =>
                {
                    RequestCounter.Decrement();
                    Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount);
                },
                ex =>
                {
                    RequestCounter.Decrement();
                    Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount);
                },
                () =>
                {
                });
        }
    }

在网络请求计数器增加之前。请求超时后,它会递减。但是请求计数器会无限增长。你能解释一下我错过了什么吗?

4

1 回答 1

1

对可观察序列的订阅是异步的。您的 while 循环会尽可能快地运行。增量将超过延迟的减量。

如果您将 Rx v2.0 与 .NET 4.5 一起使用,您可以使用 await 来运行您的循环,并在上一个请求完成时继续执行以下迭代:

while (true)
{
    var obs = ...;

    RequestCounter.Increment();

    await obs.Select(...).Timeout(...);

    RequestCounter.Decrement();

    Console.WriteLine(...);
}

更大的问题是,如果请求完成,您为什么要尝试处理订阅?当序列达到终端状态(OnError、OnCompleted)时,Rx 会自行清理,因此您似乎可以简单地将 IDisposable 对象放在地板上而不必担心这些。

于 2012-10-05T09:36:39.187 回答