- 作业经理收到新消息并启动作业。
- 作业启动、处理消息并发送回复消息。
- 此时,作业将等待对答复的响应。
首先,我应该提一下 Async CTP可以很好地处理异步操作,但异步事件处理不了那么多。您可能需要考虑基于 Rx 的方法。但是,让我们暂时继续使用 Async CTP。
您有两个基本选项来创建任务:
- 与代表。例如,
Task.Factory.StartNew
将在线程池上运行委托。自定义任务工厂和调度程序为您提供更多任务委托选项(例如,指定委托必须在 STA 线程上运行)。
- 没有代表。例如,
TaskFactory.FromAsync
包装一个现有的Begin
/End
方法对,TaskEx.FromResult
返回一个“未来的常量”,并且TaskCompletionSource
可以用来Task
显式地控制一个(既可以在内部使用FromAsync
也可以在内部使用)。FromResult
TCS
如果作业处理受 CPU 限制,则将其传递给Task.Factory.StartNew
. 我将假设作业处理受 CPU 限制。
作业管理器伪代码:
// Responds to a new message by starting a new job on the thread pool.
private void RespondToNewMessage(IPacketMsg message)
{
IJob job = ..;
Task.Factory.StartNew(job.RunJob(message));
}
// Holds tasks waiting for a response.
private ConcurrentDictionary<int, TaskCompletionSource<IResponse>> responseTasks = ..;
// Asynchronously gets a response for the specified reply.
public Task<IResponse> GetResponseForReplyAsync(int replyId)
{
var tcs = new TaskCompletionSource<IResponse>();
responseTasks.Add(replyId, tcs);
return tcs.Task;
}
// Responds to a new response by completing and removing its task.
private void RespondToResponse(IResponse response)
{
var tcs = responseTasks[response.ReplyId];
responseTasks.Remove(response.ReplyId);
tcs.TrySetComplete(response);
}
这个想法是,工作经理还管理一个未解决的响应列表。为了实现这一点,我引入了一个简单的int
回复标识符,作业管理器可以使用它来确定哪个回复与哪个回复对应。
现在作业可以像这样工作:
public override void RunJob(IPacketMsg packet)
{
// handle packet
var myReply = new Packet();
var response = jobManager.GetResponseForReplyAsync(myReply.ReplyId);
SendReply(myReply);
await response;
}
由于我们将作业放在线程池线程上,因此有一些棘手的事情:
GetResponseForReplyAsync
必须在发送回复之前调用(注册任务),然后再await
编辑。这是为了避免在我们有机会注册之前可能会发送回复并收到回复的情况。
RespondToResponse
将在完成之前删除任务注册,以防完成任务导致发送具有相同 ID 的另一个回复。
如果作业足够短以至于不需要将它们放在线程池线程上,则可以简化解决方案。