3

我是 Rx 新手,所以我希望你能忍受我。作为我自己的练习,也可能是我可以为同事演示的示例,我为 Dns.BeginGetHostEntry()/EndGetHostEntry() 完成了两个包装类:DnsResolver 和 DnsResolverRx。

每个类都有一个公共静态方法:

void Resolve(string host, Action<IPHostEntry> getResult, Control context = null);

...以及使其有趣的一些附加要求: 1. 如果提供了上下文,则必须在关联线程上调用 getResult 2. 同一主机的先前结果缓存 MaxResultAge 秒。

非 Rx 版本工作正常,但与这个问题并不真正相关。Rx 版本如下所示:

class DnsResolverRx
{
  static Func<string, IObservable<IPHostEntry>> _resolver = Observable.FromAsyncPattern<string, IPHostEntry>(Dns.BeginGetHostEntry, Dns.EndGetHostEntry);

  public static void Resolve(string host, Action<IPHostEntry> setResult, Control context = null)
  {
    IObservable<IPHostEntry> result;
    result = _cache.GetOrCreateValue( // a trivial TryGetValue wrapper
      host,
      () => _resolver(host)
        .Do(e => Debug.WriteLine("resolved"))
        .Repeat()
        .Do(e => Debug.WriteLine("repeated"))
        .Replay(MaxResultAge)
        .RefCount()
    );

    result = result.Take(1); // each request needs only 1 result

    if (context != null)
      result = result.ObserveOn(context);

    result.Subscribe(
      entry => setResult(entry),
      ex => setResult(null)
    );
  }
}

static void Main(string[] args)
{
  for (int i=0; i<10; ++i)
  {
    int num = i;
    Debug.WriteLine("start" + num);
    DnsResolverRx.Resolve("chief", e => Debug.WriteLine("done"+num));
    Thread.Sleep(200);
  }

  Console.ReadLine();
}

Replay() 似乎有效,因此 MaxResultAge 中的第一个请求都已完成,重用相同的结果。然而,下一个请求触发了Repeat(),我最终得到了一个看似无止境的循环:

start0
start1
start2
start3
start4
start5
resolved
repeated
done0
done1
done2
done3
done4
done5
start6
resolved
repeated
resolved
repeated
... and so on ad infinitum

谁能告诉我发生了什么,我做错了什么?

4

1 回答 1

6

好吧 - 刚刚意识到 FromAsyncPatterns 是一次性的,基本上无法重新启动,并且它们在引用时开始,而不是在订阅时开始。

更换

_resolver(host)

Observable.Defer(() => _resolver(host))

似乎可以解决问题。

如果没有人反对(而且我没有发现其他问题),我很快就会接受这个答案。

于 2012-04-27T21:14:37.957 回答