8

我想在需要按顺序(在每个流中)处理的多个流中对相关任务进行排队。这些流可以并行处理。

具体来说,假设我需要两个队列,并且我希望每个队列中的任务按顺序处理。以下是用于说明所需行为的示例伪代码:

Queue1_WorkItem wi1a=...;

enqueue wi1a;

... time passes ...

Queue1_WorkItem wi1b=...;

enqueue wi1b; // This must be processed after processing of item wi1a is complete

... time passes ...

Queue2_WorkItem wi2a=...;

enqueue wi2a; // This can be processed concurrently with the wi1a/wi1b

... time passes ...

Queue1_WorkItem wi1c=...;

enqueue wi1c; // This must be processed after processing of item wi1b is complete

这是一个带有箭头的图表,说明了工作项之间的依赖关系:

在此处输入图像描述

问题是如何使用 C# 4.0/.NET 4.0 做到这一点?现在我有两个工作线程,每个队列一个BlockingCollection<>,每个队列使用一个。我想改为利用 .NET 线程池并让工作线程同时(跨流)处理项目,但在流中串行处理。换句话说,我希望能够指出例如 wi1b 取决于 wi1a 的完成,而不必跟踪完成并记住 wi1a,当 wi1b 到达时。换句话说,我只想说,“我想为 queue1 提交一个工作项,该工作项将与我已经为 queue1 提交的其他项串行处理,但可能与提交到其他队列的工作项并行处理”。

我希望这个描述是有道理的。如果没有,请随时在评论中提问,我会相应地更新这个问题。

谢谢阅读。

更新:

总结到目前为止“有缺陷”的解决方案,以下是我无法使用的答案部分的解决方案以及我无法使用它们的原因:

TPL 任务需要为ContinueWith(). 我不想在提交新任务时保留每个队列的先前任务的知识。

TDF ActionBlocks 看起来很有希望,但似乎发布到 ActionBlock 的项目是并行处理的。我需要对特定队列的项目进行连续处理。

更新 2:

回复:动作块

似乎将MaxDegreeOfParallelism选项设置为 one 会阻止并行处理提交给单个ActionBlock. 因此,每个队列似乎ActionBlock解决了我的问题,唯一的缺点是这需要安装和部署 Microsoft 的 TDF 库,我希望有一个纯 .NET 4.0 解决方案。到目前为止,这是候选人接受的答案,除非有人能想出一种方法来使用纯 .NET 4.0 解决方案来做到这一点,该解决方案不会退化为每个队列的工作线程(我已经在使用)。

4

4 回答 4

4

我了解您有很多队列并且不想占用线程。每个队列可以有一个ActionBlock 。ActionBlock 自动完成您需要的大部分工作:它按顺序处理工作项,并且仅在工作未决时启动任务。当没有工作挂起时,没有任务/线程被阻塞。

于 2012-06-27T15:48:10.630 回答
3

最好的方法是使用Task Parallel Library (TPL)and Continuations。延续不仅允许您创建任务流,还可以处理您的异常。这是对 TPL的一个很好的介绍。但是给你一些想法...

您可以使用启动 TPL 任务

Task task = Task.Factory.StartNew(() => 
{
    // Do some work here...
});

现在要在前面的任务完成(错误或成功)时开始第二个任务,您可以使用该ContinueWith方法

Task task1 = Task.Factory.StartNew(() => Console.WriteLine("Antecedant Task"));
Task task2 = task1.ContinueWith(antTask => Console.WriteLine("Continuation..."));

因此,一旦task1完成、失败或被取消,就会task2“启动”并开始运行。请注意,如果task1在到达第二行代码之前已经完成,task2将被安排立即执行。antTask传递给第二个 lambda的参数是对前面任务的引用。有关更详细的示例,请参阅此链接...

您还可以传递先前任务的延续结果

Task.Factory.StartNew<int>(() => 1)
    .ContinueWith(antTask => antTask.Result * 4)
    .ContinueWith(antTask => antTask.Result * 4)
    .ContinueWith(antTask =>Console.WriteLine(antTask.Result * 4)); // Prints 64.

笔记。请务必阅读提供的第一个链接中的异常处理,因为这可能会使新手误入 TPL。

最后一件要特别关注的事情是子任务。子任务是那些被创建为AttachedToParent. 在这种情况下,直到所有子任务都完成后,延续才会运行

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew(() =>
{
    Task.Factory.StartNew(() => { SomeMethod() }, atp);
    Task.Factory.StartNew(() => { SomeOtherMethod() }, atp); 
}).ContinueWith( cont => { Console.WriteLine("Finished!") });

我希望这有帮助。

编辑:你有没有ConcurrentCollections特别看过BlockngCollection<T>. 所以在你的情况下,你可能会使用类似的东西

public class TaskQueue : IDisposable
{
    BlockingCollection<Action> taskX = new BlockingCollection<Action>();

    public TaskQueue(int taskCount)
    {
        // Create and start new Task for each consumer.
        for (int i = 0; i < taskCount; i++)
            Task.Factory.StartNew(Consumer);  
    }

    public void Dispose() { taskX.CompleteAdding(); }

    public void EnqueueTask (Action action) { taskX.Add(Action); }

    void Consumer()
    {
        // This seq. that we are enumerating will BLOCK when no elements
        // are avalible and will end when CompleteAdding is called.
        foreach (Action action in taskX.GetConsumingEnumerable())
            action(); // Perform your task.
    }
}
于 2012-06-27T15:32:47.187 回答
1

基于 TPL 的 .NET 4.0 解决方案是可能的,同时隐藏了它需要将父任务存储在某处的事实。例如:

class QueuePool
{
    private readonly Task[] _queues;

    public QueuePool(int queueCount)
    { _queues = new Task[queueCount]; }

    public void Enqueue(int queueIndex, Action action)
    {
        lock (_queues)
        {
           var parent = _queue[queueIndex];
           if (parent == null)
               _queues[queueIndex] = Task.Factory.StartNew(action);
           else
               _queues[queueIndex] = parent.ContinueWith(_ => action());
        }
    }
}

这是对所有队列使用单个锁来说明这个想法。然而,在生产代码中,我会为每个队列使用一个锁来减少争用。

于 2012-06-28T03:27:49.620 回答
0

看起来您已经拥有的设计很好并且有效。您的工作线程(每个队列一个)是长时间运行的,因此如果您想改用任务,TaskCreationOptions.LongRunning请指定以便获得一个专用的工作线程。

但是这里实际上不需要使用 ThreadPool。它没有为长期工作提供很多好处。

于 2012-06-27T15:33:56.747 回答