4

我已经构建了这段代码来并行处理大量字符串之间的字符串比较,以加快速度。

我使用了 ConcurrentBag,因此所有线程(任务)都可以写入线程安全集合。然后我将这个集合转储到一个文件中。

我遇到的问题是ConcurrentBag<string> log,我转储到文件的填充速度比写入文件的速度快。所以我的程序不断消耗越来越多的内存,直到内存不足。

我的问题是我能做什么?改进对日志的写入?暂停任务直到 ConcurrentBag 被转储然后恢复任务?最快的选择是什么?

这是代码:

CsvWriter csv = new CsvWriter(@"C:\test.csv");

List<Bailleur> bailleurs = DataLoader.LoadBailleurs();
ConcurrentBag<string> log = new ConcurrentBag<string>();
int i = 0;

var taskWriteToLog = new Task(() =>
{
    // Consume the items in the bag
    string item;
    while (true)  //  (!log.IsEmpty)
    {
        if (!log.IsEmpty)
        {
            if (log.TryTake(out item))
            {
                csv.WriteLine(item);
            }
            else
                Console.WriteLine("Concurrent Bag busy");
        }
        else
        {
            System.Threading.Thread.Sleep(1000);
        }
    }
});

taskWriteToLog.Start();

Parallel.ForEach(bailleurs, s1 =>
{
    foreach (Bailleur s2 in bailleurs)
    {
        var lcs2 = LongestCommonSubsequenceExtensions.LongestCommonSubsequence(s1.Name, s2.Name);
        string line = String.Format("\"LCS\",\"{0}\",\"{1}\",\"{2}\"", s1.Name, s2.Name, lcs2.Item2);
        log.Add(line);
        // Console.WriteLine(line);

        var dic = DiceCoefficientExtensions.DiceCoefficient(s1.Name, s2.Name);
        line = String.Format("\"DICE\",\"{0}\",\"{1}\",\"{2}\"", s1.Name, s2.Name, dic);
        log.Add(line);
        // Console.WriteLine(line);
    }
    i++;
    Console.WriteLine(i);
});

public class CsvWriter
{
    public string FilePath { get; set; }
    private FileStream _fs { get; set; }
    private StreamWriter _sw { get; set; }

    public CsvWriter2(string filePath)
    {
        FilePath = filePath;
        _fs = new FileStream(FilePath, FileMode.Create, FileAccess.Write);
        _sw = new StreamWriter(_fs);
    }

    public void WriteLine(string line)
    {
        _sw.WriteLine(line);
    }
}
4

3 回答 3

8

不要直接使用并发包,使用具有并发包的BlockingCollection作为后备存储(默认情况下它是并发队列)。

构造函数重载之一允许您设置集合大小的上限,如果袋子已满,它将阻塞插入线程,直到有空间插入。

它还为您提供了GetConsumingEnumerable(),这使得从袋子中取出物品变得非常容易,您只需在 foreach 循环中使用它,它就会一直提供您的消费者数据,直到调用CompleteAdding 。之后,它会一直运行到袋子空了,然后像其他IEnumerable已完成的正常程序一样退出。如果在调用 CompleteAdding 之前包“变干”,它将阻塞线程并在将更多数据放入包中时自动重新启动。

void ProcessLog()
{
    CsvWriter csv = new CsvWriter(@"C:\test.csv");

    List<Bailleur> bailleurs = DataLoader.LoadBailleurs();

    const int MAX_BAG_SIZE = 500;
    BlockingCollection<string> log = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_BAG_SIZE);

    int i = 0;

    var taskWriteToLog = new Task(() =>
    {
        // Consume the items in the bag, no need for sleeps or poleing, When items are available it runs, when the bag is empty but CompletedAdding has not been called it blocks.
        foreach(string item in log.GetConsumingEnumerable())
        {
            csv.WriteLine(item);
        }
    });

    taskWriteToLog.Start();

    Parallel.ForEach(bailleurs, s1 =>
    {
        //Snip... You can switch to BlockingCollection without any changes to this section of code.
    });

    log.CompleteAdding(); //lets anyone using GetConsumingEnumerable know that no new items are comming so they can leave the foreach loops when the bag becomes empty.
}
于 2013-08-16T07:04:50.913 回答
2

使用BlockingCollection代替ConcurrentBag

BlockingCollection<string> log = new BlockingCollection<string>();
var item = log.Take();

在这种情况下Take,将被阻止,直到插入项目并且您不必检查log.IsEmpty。也不需要 Thread.Sleep

while (true)
{
    var item = log.Take();
    //Do something with item......
}
于 2013-08-16T07:01:03.727 回答
0

首先,您似乎正在使用行作为块写入文件?

如果您可以将所有数据放入对象并将其写成更大的块,那么速度会更快。目前,您可能正在达到您正在写入的设备的 IOPS 最大值。你的线条会很小。因此,您的写入模式将看起来像 4k 随机 IO.. 或更糟。

使用不同的集合不会改变磁盘写入是您正在做的最慢的事情这一事实。

查看 concurrentbag 可能无法直接实现,但如果您可以从包中删除行并将它们连接成一个接近 1-5MB 的大字符串/字节数组,您应该提高性能。(您可能需要将 CR LF 插入回字符串中。)

于 2013-08-16T07:02:51.347 回答