2

在我的 .NET 4.0 库中,我有一段代码通过网络发送数据并等待响应。为了不阻塞调用代码,该方法返回一个Task<T>在收到响应时完成的,以便代码可以像这样调用该方法:

// Send the 'message' to the given 'endpoint' and then wait for the response
Task<IResult> task = sender.SendMessageAndWaitForResponse(endpoint, message);
task.ContinueWith(
    t => 
    {
        // Do something with t.Result ...
    });

底层代码使用TaskCompletionSource以便它可以等待响应消息,而不必启动线程只是让它闲置在那里直到响应到来:

private readonly Dictionary<int, TaskCompletionSource<IResult>> m_TaskSources
    = new Dictionary<int, TaskCompletionSource<IResult>>();

public Task<IResult> SendMessageAndWaitForResponse(int endpoint, object message)
{
    var source = new TaskCompletionSource<IResult>(TaskCreationOptions.None);
    m_TaskSources.Add(endpoint, source);

    // Send the message here ...

    return source.Task;
}

收到响应后,将按如下方式处理:

public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
    if (m_TaskSources.ContainsKey(endpoint))
    {
        var source = m_TaskSources[endpoint];
        source.SetResult(value);
        m_TaskSources.Remove(endpoint);
    }
}

现在我想添加一个超时,这样调用代码就不会无限期地等待响应。但是在 .NET 4.0 上这有点混乱,因为没有简单的方法可以让任务超时。所以我想知道 Rx 是否能够更容易地做到这一点。所以我想出了以下内容:

private readonly Dictionary<int, Subject<IResult>> m_SubjectSources
    = new Dictionary<int, Subject<IResult>>();

private Task<IResult> SendMessageAndWaitForResponse(int endpoint, object message, TimeSpan timeout)
{
    var source = new Subject<IResult>();
    m_SubjectSources.Add(endpoint, source);

    // Send the message here ...

    return source.Timeout(timeout).ToTask();
}

public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
    if (m_SubjectSources.ContainsKey(endpoint))
    {
        var source = m_SubjectSources[endpoint];
        source.OnNext(value);
        source.OnCompleted();
        m_SubjectSources.Remove(endpoint);
    }
}

这一切似乎都没有问题,但是我已经看到几个问题表明Subject应该避免,所以现在我想知道是否有更多的 Rx-y 方法来实现我的目标。

4

1 回答 1

3

避免Subject在 Rx 中使用的建议通常被夸大了。Rx 中必须有事件源,它可以是Subject.

Subject 的问题通常是在两个 Rx 查询之间使用时,否则可能会被连接,或者已经有明确定义的转换IObservable<T>(例如Observable.FromEventXXXorObservable.FromAsyncXXX等​​)。

如果你愿意,你可以用下面的方法去掉Dictionaryand multiple Subjects。这使用单个主题并将过滤后的查询返回给客户端。

它本身并不是“更好” ,这是否有意义将取决于您的场景的具体情况,但它可以节省大量的主题,并为您提供了一个很好的选择来监控单个流中的所有结果。如果您是串行分派结果(比如从消息队列中),这可能是有道理的。

// you only need to synchronize if you are receiving results in parallel
private readonly ISubject<Tuple<int,IResult>, Tuple<int,IResult>> results =
    Subject.Synchronize(new Subject<Tuple<int,IResult>>());

private Task<IResult> SendMessageAndWaitForResponse(
    int endpoint, object message, TimeSpan timeout)
{           
    // your message processing here, I'm just echoing a second later
    Task.Delay(TimeSpan.FromSeconds(1)).ContinueWith(t => {
        CompleteWaitForResponseResponse(endpoint, new Result { Value = message }); 
    });

    return results.Where(r => r.Item1 == endpoint)
                  .Select(r => r.Item2)
                  .Take(1)
                  .Timeout(timeout)
                  .ToTask();
}

public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
    results.OnNext(Tuple.Create(endpoint,value));
}

我为这样的结果定义了一个类:

public class Result : IResult
{
    public object Value { get; set; }
}

public interface IResult
{
    object Value { get; set; }
}

编辑 - 回应评论中的其他问题。

  • 无需处理单个主题 - 它不会泄漏,并且在超出范围时将被垃圾收集。

  • ToTask确实接受取消令牌-但这实际上是为了从客户端取消。

  • 如果远程端断开连接,您可以将错误发送给所有客户端results.OnError(exception);- 您需要同时实例化一个新的主题实例。

就像是:

private void OnRemoteError(Exception e)
{
    results.OnError(e);        
}

这将以预期的方式显示为所有客户端的错误任务。

它也是相当线程安全的,因为订阅先前发送过的主题的客户端OnError将立即收到错误 - 从那时起它就死了。然后准备好后,您可以使用以下命令重新初始化:

private void OnInitialiseConnection()
{
    // ... your connection logic

    // reinitialise the subject...
    results = Subject.Synchronize(new Subject<Tuple<int,IResult>>());
}

对于个别客户错误,您可以考虑:

  • 扩展您的IResult界面以将错误作为数据包含在内
  • 然后,您可以通过在SendMessageAndWaitForResponse. 例如,以及 IResult 的 Exception 和 HasError 属性,以便您可以执行以下操作:

    return results.Where(r => r.Item1 == endpoint)
                .SelectMany(r => r.Item2.HasError
                    ? Observable.Throw<IResult>(r.Item2.Exception)
                    : Observable.Return(r.Item2))
                .Take(1)
                .Timeout(timeout)
                .ToTask();
    
于 2014-04-03T09:49:02.477 回答