4

在我的应用程序中,我有三个类,ExtractorTransformerLoader它们由第四个类协调,CoordinatorExtractorTransformer并且Loader非常简单,请执行以下操作:

Extractor

公开一个名为Resultstype的成员IEnumerable<string>,例如通过从文本文件中读取。提取应该是同步的。

Transformer

公开一个名为的成员,该成员Transform接受一个字符串,并通过一些预计耗时的过程将其转换为另一个字符串(在此处使用并行处理)。

Loader

公开一个被调用的成员,该成员Load接受一个字符串并将其加载到某种最终形式(例如另一个文本文件)中。加载应该是同步的。

这些Coordinator类协调这三个操作。转换过程应该并行完成,然后将结果推送到加载器读取的队列中。CoordinatorRun()方法如下所示:

Extractor extractor = new Extractor();
Transformer transformer = new Transformer();
Loader loader = new Loader();

ConcurrentQueue<string> outputs = new ConcurrentQueue<string>();

Parallel.ForEach(extractor.Results, x => outputs.Enqueue(transformer.Transform(x)));

foreach(string output in outputs)
{
  loader.Load(output);
}

这工作得很好,除了所有转换必须在任何加载之前完成 - 即Parallel.ForEach()在以下foreach开始之前完成。我希望每个输出在准备好后立即传递给加载器。

我也试过这个:

Extractor extractor = new Extractor();
Transformer transformer = new Transformer();
Loader loader = new Loader();

ConcurrentQueue<string> outputs = new ConcurrentQueue<string>();

foreach (string input in extractor.Results)
{
  string input1 = input;
  Task task = Task.Factory.StartNew(
                    () => outputs.Enqueue(transformer.Transform(input1)));
}

foreach(string output in outputs)
{
  loader.Load(output);
}

但是foreach在任何输出被添加到队列之前,底部的循环就被击中了,所以它只是退出了。

调用结果后如何立即加载transformer.Transform()

4

1 回答 1

7

尝试BlockingCollection使用Parallel.Invoke. 在下面的示例中,GetConsumingEnumerable(生产者-消费者模式的消费者CompleteAdding部分)在被调用之前不会完成,因此load将运行直到fill完成。

var outputs = new BlockingCollection<string>();

// aka Producer
Action fill = () => {
    Parallel.ForEach(extractor.Results, x => outputs.Add(transformer.Transform(x)));        
    outputs.CompleteAdding();
};

// aka Consumer
Action load = () => {
   foreach(var o in outputs.GetConsumingEnumerable()) 
       loader.Load(o);
}

Parallel.Invoke(fill, load);
于 2012-10-03T15:56:44.860 回答