在我的 .NET 4.0 库中,我有一段代码通过网络发送数据并等待响应。为了不阻塞调用代码,该方法返回一个Task<T>
在收到响应时完成的,以便代码可以像这样调用该方法:
// Send the 'message' to the given 'endpoint' and then wait for the response
Task<IResult> task = sender.SendMessageAndWaitForResponse(endpoint, message);
task.ContinueWith(
t =>
{
// Do something with t.Result ...
});
底层代码使用TaskCompletionSource以便它可以等待响应消息,而不必启动线程只是让它闲置在那里直到响应到来:
private readonly Dictionary<int, TaskCompletionSource<IResult>> m_TaskSources
= new Dictionary<int, TaskCompletionSource<IResult>>();
public Task<IResult> SendMessageAndWaitForResponse(int endpoint, object message)
{
var source = new TaskCompletionSource<IResult>(TaskCreationOptions.None);
m_TaskSources.Add(endpoint, source);
// Send the message here ...
return source.Task;
}
收到响应后,将按如下方式处理:
public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
if (m_TaskSources.ContainsKey(endpoint))
{
var source = m_TaskSources[endpoint];
source.SetResult(value);
m_TaskSources.Remove(endpoint);
}
}
现在我想添加一个超时,这样调用代码就不会无限期地等待响应。但是在 .NET 4.0 上这有点混乱,因为没有简单的方法可以让任务超时。所以我想知道 Rx 是否能够更容易地做到这一点。所以我想出了以下内容:
private readonly Dictionary<int, Subject<IResult>> m_SubjectSources
= new Dictionary<int, Subject<IResult>>();
private Task<IResult> SendMessageAndWaitForResponse(int endpoint, object message, TimeSpan timeout)
{
var source = new Subject<IResult>();
m_SubjectSources.Add(endpoint, source);
// Send the message here ...
return source.Timeout(timeout).ToTask();
}
public void CompleteWaitForResponseResponse(int endpoint, IResult value)
{
if (m_SubjectSources.ContainsKey(endpoint))
{
var source = m_SubjectSources[endpoint];
source.OnNext(value);
source.OnCompleted();
m_SubjectSources.Remove(endpoint);
}
}
这一切似乎都没有问题,但是我已经看到几个问题表明Subject
应该避免,所以现在我想知道是否有更多的 Rx-y 方法来实现我的目标。