3

我的代码目前有以下 10 个工作线程。每个工作线程继续从队列中轮询作业,然后处理长时间运行的作业。

for (int k=0; k<10; k++)
{
  Task.Factory.StartNew(() => DoPollingThenWork(), TaskCreationOptions.LongRunning);
}

void DoPollingThenWork()
{
  while (true)
  {
    var msg = Poll();
    if (msg != null)
    {
      Thread.Sleep(3000); // process the I/O bound job
    }
  }
}

我正在重构底层代码以使用异步/等待模式。我想我可以将上面的代码重写为以下内容。它使用一个主线程不断创建异步任务,并使用 SemaphoreSlim 将并发任务的数量限制为 10。

Task.Factory.StartNew(() => WorkerMainAsync(), TaskCreationOptions.LongRunning);

async Task WorkerMainAsync()
{
  SemaphoreSlim ss = new SemaphoreSlim(10);
  while (true)
  {
    await ss.WaitAsync();
    Task.Run(async () => 
              {
                await DoPollingThenWorkAsync();
                ss.Release();
              });
  }
}

async Task DoPollingThenWorkAsync()
{
  var msg = Poll();
  if (msg != null)
  {
    await Task.Delay(3000); // process the I/O-bound job
  }
}

两者的行为应该相同。但我认为第二种选择似乎更好,因为它不会阻塞线程。但不利的一面是我不能等待(优雅地停止任务),因为任务就像一劳永逸。第二种选择是取代传统工作线程模式的正确方法吗?

4

2 回答 2

4

当你有异步代码时,你通常没有理由使用Task.Run()(或者,更糟糕的是,Task.Factory.StartNew())。这意味着您可以将代码更改为以下内容:

await WorkerMainAsync();

async Task WorkerMainAsync()
{
  SemaphoreSlim ss = new SemaphoreSlim(10);
  while (true)
  {
    await ss.WaitAsync();
    // you should probably store this task somewhere and then await it
    var task = DoPollingThenWorkAsync();
  }
}

async Task DoPollingThenWorkAsync(SemaphoreSlim semaphore)
{
  var msg = Poll();
  if (msg != null)
  {
    await Task.Delay(3000); // process the I/O-bound job
  }

  // this assumes you don't have to worry about exceptions
  // otherwise consider try-finally
  semaphore.Release();
}
于 2013-10-13T10:13:02.833 回答
2

通常你不会async/await CPU 密集型任务中使用。启动此类任务的方法 ( WorkerMainAsync) 可以使用async/await,但您应该跟踪待处理的任务:

async Task WorkerMainAsync()
{
  SemaphoreSlim ss = new SemaphoreSlim(10);
  List<Task> trackedTasks = new List<Task>();
  while (DoMore())
  {
    await ss.WaitAsync();
    trackedTasks.Add(Task.Run(() => 
              {
                DoPollingThenWorkAsync();
                ss.Release();
              }));
  }
  await Task.WhenAll(trackedTasks);
}

void DoPollingThenWorkAsync()
{
  var msg = Poll();
  if (msg != null)
  {
    Thread.Sleep(2000); // process the long running CPU-bound job
  }
}

另一个练习是在任务完成时删除trackedTasks它们。例如,您可以使用ContinueWith来删除已完成的任务(在这种情况下,请记住使用lock来防止trackedTasks同时访问)。

如果你真的需要使用awaitinside DoPollingThenWorkAsync,代码不会有太大变化:

    trackedTasks.Add(Task.Run(async () => 
              {
                await DoPollingThenWorkAsync();
                ss.Release();
              }));

请注意,在这种情况下,您将在此处处理异步 lambda的嵌套任务Task.Run,该任务将自动为您解包。

于 2013-10-13T04:44:40.313 回答