16

我最近听到了很多关于 .NET 4.0 中的 TPL 的播客。它们中的大多数描述了诸如下载图像或进行计算之类的后台活动,使用任务以使工作不会干扰 GUI 线程。

我处理的大多数代码都具有多生产者/单一消费者的风格,其中来自多个来源的工作项必须排队,然后按顺序处理。一个示例是日志记录,其中来自多个线程的日志行被顺序排列到单个队列中,以最终写入文件或数据库。来自任何单一来源的所有记录都必须保持有序,并且来自同一时刻的记录在最终输出中应该彼此“接近”。

所以多个线程或任务或任何都在调用队列器:

lock( _queue ) // or use a lock-free queue!
{
   _queue.enqueue( some_work );
   _queueSemaphore.Release();
}

一个专用的工作线程处理队列:

while( _queueSemaphore.WaitOne() )
{
   lock( _queue )
   {
      some_work = _queue.dequeue();     
   }
   deal_with( some_work );
}

将工作线程专用于这些任务的消费者端似乎总是合理的。我应该改用 TPL 中的一些构造来编写未来的程序吗?哪一个?为什么?

4

4 回答 4

13

您可以按照 Wilka 的建议使用长时间运行的任务来处理 BlockingCollection 中的项目。这是一个几乎可以满足您的应用程序要求的示例。你会看到类似这样的输出:

Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C

并不是说来自 A、B、C 和 D 的输出看起来是随机的,因为它们取决于线程的开始时间,但 B 总是出现在 B1 之前。

public class LogItem 
{
    public string Message { get; private set; }

    public LogItem (string message)
    {
        Message = message;
    }
}

public void Example()
{
    BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();

    // Start queue listener...
    CancellationTokenSource canceller = new CancellationTokenSource();
    Task listener = Task.Factory.StartNew(() =>
        {
            while (!canceller.Token.IsCancellationRequested)
            {
                LogItem item;
                if (_queue.TryTake(out item))
                    Console.WriteLine(item.Message);
            }
        },
    canceller.Token, 
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    // Add some log messages in parallel...
    Parallel.Invoke(
        () => { _queue.Add(new LogItem("Log from task A")); },
        () => { 
            _queue.Add(new LogItem("Log from task B")); 
            _queue.Add(new LogItem("Log from task B1")); 
        },
        () => { _queue.Add(new LogItem("Log from task C")); },
        () => { _queue.Add(new LogItem("Log from task D")); });

    // Pretend to do other things...
    Thread.Sleep(1000);

    // Shut down the listener...
    canceller.Cancel();
    listener.Wait();
}
于 2010-05-06T07:20:02.350 回答
5

我知道这个答案大约晚了一年,但看看MSDN

它显示了如何从 TaskScheduler 类创建 LimitedConcurrencyLevelTask​​Scheduler。通过将并发限制为单个任务,然后应该按顺序处理您的任务,因为它们通过以下方式排队:

LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(lcts);

factory.StartNew(()=> 
{
   // your code
});
于 2011-02-09T10:19:26.253 回答
3

我不确定 TPL 在您的用例中是否足够。据我了解,TPL 的主要用例是将一项大型任务拆分为几个可以并行运行的较小任务。例如,如果您有一个大列表,并且您想对每个元素应用相同的转换。在这种情况下,您可以有多个任务在列表的子集上应用转换。

你描述的情况对我来说似乎不适合这张照片。在您的情况下,您没有多个并行执行相同操作的任务。您有几个不同的任务,每个任务都是自己的工作(生产者)和一个消耗的任务。如果你想拥有多个消费者,也许 TPL 可以用于消费者部分,因为在这种情况下,每个消费者都做同样的工作(假设你找到了一个逻辑来强制你寻找的时间一致性)。

好吧,这当然只是我对这个问题的个人看法

健康长寿·繁荣昌盛

于 2010-02-23T12:46:24.497 回答
2

听起来BlockingCollection对您来说很方便。所以对于你上面的代码,你可以使用类似的东西(假设_queue是一个BlockingCollection实例):

// for your producers 
_queue.Add(some_work);

处理队列的专用工作线程:

foreach (var some_work in _queue.GetConsumingEnumerable())
{
    deal_with(some_work);
}

注意:当你所有的生产者都完成生产后,你需要打电话CompleteAdding()_queue否则你的消费者将被困在等待更多的工作。

于 2010-03-21T18:44:12.823 回答