Looking for a best approach to reading from data source such as Azure Table Storage which is time consuming and converting the data in to json or csv and writing in to local file with file name depending on partition key.
One approach being considered is running the writing to file task on timer elapsed event trigger with fixed time interval.
问问题
1448 次
1 回答
3
对于不能很好并行化的事情(如 I/O),最好的办法是使用“生产者-消费者模型”。
它的工作方式是您有一个线程处理不可并行化的任务,该任务所做的所有事情都被读入缓冲区。然后你有一组并行任务,它们都从缓冲区读取并处理数据,然后在处理完数据后将数据放入另一个缓冲区。如果您随后需要以不可并行化的方式再次写出结果,那么您将有另一个任务写出结果。
public Stream ProcessData(string filePath)
{
using(var sourceCollection = new BlockingCollection<string>())
using(var destinationCollection = new BlockingCollection<SomeClass>())
{
//Create a new background task to start reading in the file
Task.Factory.StartNew(() => ReadInFile(filePath, sourceCollection), TaskCreationOptions.LongRunning);
//Create a new background task to process the read in lines as they come in
Task.Factory.StartNew(() => TransformToClass(sourceCollection, destinationCollection), TaskCreationOptions.LongRunning);
//Process the newly created objects as they are created on the same thread that we originally called the function with
return TrasformToStream(destinationCollection);
}
}
private static void ReadInFile(string filePath, BlockingCollection<string> collection)
{
foreach(var line in File.ReadLines(filePath))
{
collection.Add(line);
}
//This lets the consumer know that we will not be adding any more items to the collection.
collection.CompleteAdding();
}
private static void TransformToClass(BlockingCollection<string> source, BlockingCollection<SomeClass> dest)
{
//GetConsumingEnumerable() will take items out of the collection and block the thread if there are no items available and CompleteAdding() has not been called yet.
Parallel.ForEeach(source.GetConsumingEnumerable(),
(line) => dest.Add(SomeClass.ExpensiveTransform(line));
dest.CompleteAdding();
}
private static Stream TrasformToStream(BlockingCollection<SomeClass> source)
{
var stream = new MemoryStream();
foreach(var record in source.GetConsumingEnumerable())
{
record.Seralize(stream);
}
return stream;
}
我强烈建议您阅读免费的并行编程模式一书,它对此进行了一些详细介绍。有一整节详细解释了生产者-消费者模型。
更新:对于小型性能启动使用GetConsumingPartitioner()
,而不是循环中的GetConsumingEnumerable()
并行扩展附加。通过传递分区器而不是可枚举的对象,它对传入的内容做了一些假设,导致它不需要获取额外的锁,而不需要获取那些额外的锁。Parallel.ForEach
ForEach
IEnumerable
于 2013-10-26T16:01:55.663 回答