我是 Rx 新手,所以我希望你能忍受我。作为我自己的练习,也可能是我可以为同事演示的示例,我为 Dns.BeginGetHostEntry()/EndGetHostEntry() 完成了两个包装类:DnsResolver 和 DnsResolverRx。
每个类都有一个公共静态方法:
void Resolve(string host, Action<IPHostEntry> getResult, Control context = null);
...以及使其有趣的一些附加要求: 1. 如果提供了上下文,则必须在关联线程上调用 getResult 2. 同一主机的先前结果缓存 MaxResultAge 秒。
非 Rx 版本工作正常,但与这个问题并不真正相关。Rx 版本如下所示:
class DnsResolverRx
{
static Func<string, IObservable<IPHostEntry>> _resolver = Observable.FromAsyncPattern<string, IPHostEntry>(Dns.BeginGetHostEntry, Dns.EndGetHostEntry);
public static void Resolve(string host, Action<IPHostEntry> setResult, Control context = null)
{
IObservable<IPHostEntry> result;
result = _cache.GetOrCreateValue( // a trivial TryGetValue wrapper
host,
() => _resolver(host)
.Do(e => Debug.WriteLine("resolved"))
.Repeat()
.Do(e => Debug.WriteLine("repeated"))
.Replay(MaxResultAge)
.RefCount()
);
result = result.Take(1); // each request needs only 1 result
if (context != null)
result = result.ObserveOn(context);
result.Subscribe(
entry => setResult(entry),
ex => setResult(null)
);
}
}
static void Main(string[] args)
{
for (int i=0; i<10; ++i)
{
int num = i;
Debug.WriteLine("start" + num);
DnsResolverRx.Resolve("chief", e => Debug.WriteLine("done"+num));
Thread.Sleep(200);
}
Console.ReadLine();
}
Replay() 似乎有效,因此 MaxResultAge 中的第一个请求都已完成,重用相同的结果。然而,下一个请求触发了Repeat(),我最终得到了一个看似无止境的循环:
start0
start1
start2
start3
start4
start5
resolved
repeated
done0
done1
done2
done3
done4
done5
start6
resolved
repeated
resolved
repeated
... and so on ad infinitum
谁能告诉我发生了什么,我做错了什么?