6

我最近一直在玩新的 Async CTP,但我遇到了一种情况,我不确定如何进行。

在我当前的代码库中,我使用了“工作”和“工作管理器”的概念。作业的存在仅用于处理初始消息、发送响应,然后等待响应。

我已经有基于同步套接字的现有代码,其中网络线程正在等待数据到达,然后将其传递给事件处理程序,最终传递给作业管理器。

作业管理器查找将处理该消息的作业,并将其传递。

所以场景是这样的:

  1. 作业经理收到新消息并启动作业。
  2. 作业启动、处理消息并发送回复消息。
  3. 此时,作业将等待对答复的响应。

这是一个伪代码示例:

class MyJob : Job
{
    public override void RunJob( IPacketMsg packet )
    {
        // handle packet

        var myReply = new Packet();
        SendReply( myReply );

        await GetResponse();
    }
}

但我不完全确定如何在第 3 步继续。作业管理器将获得响应,然后将其交给正在运行的作业。但我不确定如何让工作等待响应。

我考虑过创建一个等待的任务,它只是阻塞在 WaitHandle 上,但这是最好的解决方案吗?

在这种情况下,我还能做些什么吗?

编辑 关于异步 CTP 的主题,在不使用 UI 的情况下会发生什么。我已经阅读了 Eric Lippert 的 Async 博客,但我不相信它曾经触及过在没有 UI 线程的情况下一切如何在后台工作的主题(它是从后台工作人员分离出来还是......?)

4

3 回答 3

5
  1. 作业经理收到新消息并启动作业。
  2. 作业启动、处理消息并发送回复消息。
  3. 此时,作业将等待对答复的响应。

首先,我应该提一下 Async CTP可以很好地处理异步操作,但异步事件处理不了那么多。您可能需要考虑基于 Rx 的方法。但是,让我们暂时继续使用 Async CTP。

您有两个基本选项来创建任务:

  • 与代表。例如,Task.Factory.StartNew将在线程池上运行委托。自定义任务工厂和调度程序为您提供更多任务委托选项(例如,指定委托必须在 STA 线程上运行)。
  • 没有代表。例如,TaskFactory.FromAsync包装一个现有的Begin/End方法对,TaskEx.FromResult返回一个“未来的常量”,并且TaskCompletionSource可以用来Task显式地控制一个(既可以在内部使用FromAsync也可以在内部使用)。FromResultTCS

如果作业处理受 CPU 限制,则将其传递给Task.Factory.StartNew. 我将假设作业处理受 CPU 限制。

作业管理器伪代码:

// Responds to a new message by starting a new job on the thread pool.
private void RespondToNewMessage(IPacketMsg message)
{
  IJob job = ..;
  Task.Factory.StartNew(job.RunJob(message));
}

// Holds tasks waiting for a response.
private ConcurrentDictionary<int, TaskCompletionSource<IResponse>> responseTasks = ..;

// Asynchronously gets a response for the specified reply.
public Task<IResponse> GetResponseForReplyAsync(int replyId)
{
  var tcs = new TaskCompletionSource<IResponse>();
  responseTasks.Add(replyId, tcs);
  return tcs.Task;
}

// Responds to a new response by completing and removing its task.
private void RespondToResponse(IResponse response)
{
  var tcs = responseTasks[response.ReplyId];
  responseTasks.Remove(response.ReplyId);
  tcs.TrySetComplete(response);
}

这个想法是,工作经理还管理一个未解决的响应列表。为了实现这一点,我引入了一个简单的int回复标识符,作业管理器可以使用它来确定哪个回复与哪个回复对应。

现在作业可以像这样工作:

public override void RunJob(IPacketMsg packet)
{
  // handle packet
  var myReply = new Packet();
  var response = jobManager.GetResponseForReplyAsync(myReply.ReplyId);
  SendReply(myReply);

  await response;
}

由于我们将作业放在线程池线程上,因此有一些棘手的事情:

  1. GetResponseForReplyAsync必须在发送回复之前调用(注册任务),然后再await编辑。这是为了避免在我们有机会注册之前可能会发送回复并收到回复的情况。
  2. RespondToResponse将在完成之前删除任务注册,以防完成任务导致发送具有相同 ID 的另一个回复。

如果作业足够短以至于不需要将它们放在线程池线程上,则可以简化解决方案。

于 2011-09-12T21:02:35.037 回答
3

关于异步 CTP,在不使用 UI 的情况下会发生什么。我已经阅读了 Eric Lippert 的 Async 博客,但我不相信它曾经触及过在没有 UI 线程的情况下一切如何在后台工作的主题(它是从后台工作人员分离出来还是......?)

await将返回其同步上下文。在 UI 进程中,这是一个 UI 消息循环。在 ASP.NET 中,这是 ASP.NET 线程池。在其他情况下(控制台应用程序和 Win32 服务),没有上下文,因此继续排队到ThreadPool. 这通常不是所需的行为,因此我编写了一个AsyncContext可以在这些情况下使用的类。

BackgroundWorker未使用。在像您这样的服务器端场景中,根本没有后台线程并不少见。

于 2011-09-12T20:17:33.903 回答
1

您只需将事件处理程序的其余部分与等待模式连接起来,如下所示:

 public async void RunJob(IPacketMsg msg)
 {
     // Do Stuff

     var response = await GetResponse();

     // response is "string", not "Task<string>"

     // Do More Stuff
 }

 public Task<string> GetResponse()
 {
     return Task.Factory.StartNew(() =>
        {
             _networkThingy.WaitForDataAvailable();

             return _networkThingy.ResponseString;
        });
 }

当您的获取响应任务完成时,该方法的其余部分将在您当前的同步上下文中执行。但是,在此之前,您的方法执行会产生(因此等待后的任何代码都不会运行,直到 GetResponse 中启动的任务完成)

于 2011-09-12T20:24:31.910 回答