我正在使用 Reactive Extensions 创建匿名 Observable 来执行 Web 请求,对其进行后处理并返回一些值。这种技术帮助我将价值获取的机制与使用这些值的其他类区分开来。Web 请求经常执行,因此有一个计时器可以使用这种技术构造和调用请求。我构建 Observable 如下:
Observable.FromAsyncPattern<WebResponse>(getRequest.BeginGetResponse, getRequest.EndGetResponse)
实际上,端点并不总是可用的。在 web 请求超时的情况下,Subscribe()
方法中指定的回调永远不会被调用。我正在使用Timeout()
Rx 扩展来处理这种情况。我也在做请求结果的后处理。这是我为解决该问题而实施的一段仿真代码:
var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint");
var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();
var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(...)
然后我实现了处理方法的返回值Subscribe()
以从 Observable 取消订阅回调委托。实际上我有一个三元组的集合<IObservable obs, IDisposable disp, bool RequestComplete>
。Subscribe()
方法回调设置RequestComplete = true
,并且随着时间的推移,如果不再需要所有一次性用品,它们就会被处置,即已经调用了回调。一切似乎都很好,但我注意到收藏在不断增长,我不知道为什么。
所以我实现了这种情况的仿真。此代码没有处理部分 - 我将其替换为简单的请求计数器:
public static class RequestCounter
{
private static object syncRoot = new object();
private static int count = 0;
public static void Increment()
{
lock (syncRoot)
{
count++;
}
}
public static void Decrement()
{
lock (syncRoot)
{
count--;
}
}
public static int RequestCount
{
get { return count; }
}
}
这是测试本身:
public void RunTest()
{
while (true)
{
var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint");
var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();
RequestCounter.Increment();
var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(
res =>
{
RequestCounter.Decrement();
Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount);
},
ex =>
{
RequestCounter.Decrement();
Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount);
},
() =>
{
});
}
}
在网络请求计数器增加之前。请求超时后,它会递减。但是请求计数器会无限增长。你能解释一下我错过了什么吗?