0

我正在尝试通过Parallel.ForEach将处理后的数据添加到BlockingCollection.

问题是我希望TasktaskWriteMergedFile 至少每 800000 行消耗集合并将它们写入结果文件。

我想我无法在迭代中测试集合大小,因为它是并行的,所以我创建了Task.

EventWaitHandle在这种情况下,我可以将任务中的 while(true) 循环转换为吗?

const int MAX_SIZE = 1000000;
static BlockingCollection<string> mergeData;
mergeData = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_SIZE);


string[] FilePaths = Directory.GetFiles("somepath");

var taskWriteMergedFile = new Task(() =>
{
    while ( true )
    {
        if ( mergeData.Count  > 800000)
        {
            String.Join(System.Environment.NewLine, mergeData.GetConsumingEnumerable());
            //Write to file
        }
        Thread.Sleep(10000); 
    }
}, TaskCreationOptions.LongRunning);

taskWriteMergedFile.Start();
Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath));
mergeData.CompleteAdding();
4

1 回答 1

1

你可能不想那样做。相反,让您的任务在收到文件时将每一行写入文件。如果要将文件大小限制为 80,000 行,则在写入第 80,000 行后,关闭当前文件并打开一个新文件。

想一想,你所拥有的东西是不能工作的,因为GetConsumingEnumerable()在集合被标记为完成添加之前不会停止。会发生什么事情会经历睡眠循环,直到队列中有 80,000 个项目,然后它会阻塞String.Join直到主线程调用CompleteAdding. 有了足够的数据,您就会耗尽内存。

此外,除非您有充分的理由,否则不应ConcurrentBag在此处使用。只需使用 的默认值BlockingCollection,即ConcurrentQueue. ConcurrentBag是一种用途相当特殊的数据结构,其性能不如ConcurrentQueue.

所以你的任务变成:

var taskWriteMergedFile = new Task(() =>
{
    int recordCount = 0;
    foreach (var line in mergeData.GetConsumingEnumerable())
    {
        outputFile.WriteLine(line);
        ++recordCount;
        if (recordCount == 80,000)
        {
            // If you want to do something after 80,000 lines, do it here
            // and then reset the record count
            recordCount = 0;
        }
    }
}, TaskCreationOptions.LongRunning);

当然,这假设您已经在其他地方打开了输出文件。最好在任务开始时打开输出,并在foreach退出后关闭它。

另一方面,您可能不希望您的生产者循环是并行的。你有:

Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath));

我不确定AddToDataPool在做什么,但是如果它正在读取文件并将数据写入集合,那么您会遇到一些问题。首先,磁盘驱动器一次只能做一件事,所以它最终会读取一个文件的一部分,然后是另一个文件的一部分,然后是另一个文件的一部分,等等。为了读取下一个文件的每个块,它必须寻找头部到适当的位置。磁盘磁头寻道非常昂贵——5 毫秒或更多。CPU 时间的永恒。除非您正在执行比读取文件花费更长的时间的繁重处理,否则一次处理一个文件几乎总是更好。除非您可以保证输入文件位于不同的物理磁盘上。. .

第二个潜在问题是在运行多个线程的情况下,您无法保证将内容写入集合的顺序。当然,这可能不是问题,但是如果您希望单个文件中的所有数据在输出中组合在一起,那么对于每个向集合写入多行的多个线程就不会发生这种情况。

只是要记住的事情。

于 2014-11-21T04:55:27.617 回答