1

我想优化这段代码:

    public static void ProcessTo(this StreamReader sr, StreamWriter sw, Action<StreamWriter, string> action, FileProcessOptions fpo = null)
    {
        if (fpo == null)
        {
            fpo = new FileProcessOptions();
        }

        List<string> buffer = new List<string>(fpo.BuferSize);

        while (!sr.EndOfStream)
        {
            buffer.Clear();

            while (!sr.EndOfStream && buffer.Count < fpo.BuferSize)
            {
                buffer.Add(sr.ReadLine());
            }

            if (fpo.UseThreads)
            {
                buffer.AsParallel().ForAll(line => action(sw, line));
            }
            else
            {
                buffer.ForEach(line => action(sw, line));
            }
        }
    }

我处理大量数据并希望并行化该过程。通常数据归档,因此使用多线程处理数据流非常重要

4

2 回答 2

2

如果你不传递 a StreamReader,而只是传递文件名,你可以这样写:

Parallel.Foreach(File.ReadLines(filename), (line) => action(sw, line));

如果你通过了,你仍然可以这样做StreamReader。您只需要创建一个可以读取它的枚举器。类似于这里所做的事情:GetEnumerator() 在实现 IEnumerable<T> 和 IEnumerator<T> 时的推荐行为。使用它,你会写:

LineReaderEnumerable myEnumerable = new LineEnumerator(sr);
Parallel.Foreach(myEnumerable, (line) => action(sw, line));

但是,您有一个潜在的问题,因为您可能会有多个线程写入该流编写器。并且StreamWriter不支持并发写入。它会抛出异常。如果您正在同步对输出文件的访问(例如,使用锁),那么您在这里没问题。

您将遇到的另一个问题是输出的顺序。几乎可以肯定的是,如果您按顺序读取行[1, 2, 3, 4, ... n],输出顺序会有所不同。你可能会得到[1, 2, 4, 3, 6, 5, 7, 9, 8 ... n, n-1]. 如果输出顺序很重要,您必须想出一种方法来确保以正确的顺序输出事物。

关于锁,您有:

sr.ProcessParalel(line => 
{ 
    string[] ls = line.Split('\t');
    lock (sw)
    {
        sw.Write(float.Parse(ls[0]));
        sw.Write(int.Parse(ls[1]) * 10 + 1);
        for (int i = 2; i < ls.Length; i++)
        {
            sw.Write(int.Parse(ls[1]));
        }
    }
 });

问题不在于锁。问题是您持有锁的时间过长。按照您编写的方式,代码实际上是单线程的,因为所有线程都在等待该锁进行处理。您需要更改您的处理,以使锁定的保持时间尽可能短。

将您的输出构建为 a StringBuilder,将其转换为字符串,然后输出该字符串。例如:

string[] ls = line.Split('\t');
StringBuilder sb = new StringBuilder();
sb.Append(float.Parse(ls[0]));
sb.Append(' ');
sb.Append(int.Parse(ls[1])) * 10 + 1);
for (int i = 2; i < ls.Length; i++)
{
    sb.Append(' ');
    sb.Append(int.Parse(ls[i]));    }
}
var sout = sb.ToString();
// lock and write
lock (sw)
{
    sw.Write(sout);
}

你可以用StringWriter做同样的事情。

于 2013-07-12T19:08:11.483 回答
0

最终解决方案:

        public static IEnumerable<string> GetEnumirator(this StreamReader sr)
    {
        while (!sr.EndOfStream)
        {
            yield return sr.ReadLine();
        }
    }

    public static void ProcessParalel(this StreamReader sr, Action<string> action)
    {
        sr.GetEnumirator().AsParallel().ForAll(action);
    }

    public static void ProcessTo(this StreamReader sr, BinaryWriter bw, Action<BinaryWriter, string> action, FileProcessOptions fpo = null)
    {
        sr.ProcessParalel(line =>
        {
            using (MemoryStream ms = new MemoryStream())
            {
                BinaryWriter lbw = new BinaryWriter(ms);

                action(lbw, line);
                ms.Seek(0, SeekOrigin.Begin);

                lock (bw)
                {
                    ms.WriteTo(bw.BaseStream);
                }
            }
        });
    }

使用压缩输入流,我得到了 3 倍的加速度

于 2013-07-14T05:44:18.217 回答