3

我想在 C# 中创建一个并行管道。我声明了一个名为 IOperation 的接口:

public interface IOperation<Tin, Tout>
{
    BlockingCollection<Tout> BlockingCollection(IEnumerable<Tin> input);
}

现在我想编写一个类,它可以并行执行多个这些操作。我对此感到厌烦:

public class Pipeline : IPipeline
{
    private List<IOperation<Object, Object>> operations = new List<IOperation<Object, Object>>();
    private List<BlockingCollection<Object>> buffers = new List<BlockingCollection<Object>>();
    public void Register(IOperation<Object, Object> operation)
    {
        operations.Add(operation);
    }

    public void Execute()
    {

    }
}

但我没有找到任何解决方案来保存操作和操作之间的缓冲区,因为它们都有不同的泛型类型。有人有想法吗?

4

4 回答 4

1

Microsoft has something exactly like this -- TPL Dataflow lets you define blocks in a pipeline, with fine-grained controls on how they are buffered and parallelized.

Unlike your solution, it uses a fully asynchronous push design. It does not use a BlockingCollection (a blocking pull design), and will be significantly faster for it if you have a deep pipeline.

于 2013-11-11T16:31:36.980 回答
1

目前还不清楚您的管道是如何工作的。你为什么要绕过 BlockingCollections?为什么要使用泛型,然后将object其作为类型输入?

考虑改为使用带有类型委托加载的管道,Action然后使用任务并行库创建并行执行这些操作的任务。

public void Register(Action operation)
    {
        operations.Add(operation);
    }

public void Execute()
    {
        foreach (var action in operations)
          Task.StartNew(operation);
    }

但这并不是真正的“管道”,它只是一组并行执行的操作。

流水线通常具有输入类型和输出类型的流水线步骤。您可以通过创建类似的东西来处理这个问题,PipelineStep<T,U>并且您将构建传入 Func 操作的每个管道步骤。在内部,每个管道步骤可以使用输入 IEnumerable 并产生输出 IEnumerable,它可以使用 Task 或更简单地使用并行 foreach 循环来执行此操作。

或者,您也许可以使用 TPL 的Task.ContinueWith方法将任务从输入到输出链接在一起。

于 2011-01-06T16:35:35.963 回答
1

您是否考虑过使用 TPL 中的 Parallel.ForEach?
任务并行库 (TPL) 是 .NET 4 中的一组公共类型和 API。

于 2011-01-06T16:13:38.780 回答
0

在http://msdn.microsoft.com/en-us/library/ff963548.aspx上有一篇关于 BlockingCollection 并行管道的好文章。

基本上每个步骤都应该有一个 BlockingCollection 类型的输出队列。它从上一步的输出队列中获取项目,并在完成处理后将它们添加到它的输出中。

于 2013-11-11T16:20:20.573 回答