6

我正在编写一个完全托管的 Mercurial 库(将用于Windows 的完全托管的 Mercurial Server,即将推出),我遇到的最严重的性能问题之一是,奇怪的是,将数组拆分为多个部分。

这个想法如下:有一个字节数组,大小从几百字节到一兆字节不等,我需要做的就是将它拆分为由字符分隔的部分,在我的特定情况下,\n字符。

现在 dotTrace 向我展示的是我的“优化”版本Split代码正确的,这是我开始使用的幼稚版本)在 2,300 次调用中占用了 11 秒(dotTrace 本身引入了明显的性能影响,但一切都取决于规模)。

以下是数字:

  • unsafe版本:11 297ms 用于2 312通话
  • 托管(“天真”)版本:调用20 001毫秒2 312

所以这里是:在 C# 中拆分数组的最快(最好是可移植的,这意味着同时支持 x86 和 x64)方法是什么。

4

3 回答 3

4

我相信问题是,你在循环中做很多复杂的操作。此代码删除了循环内除单次加法和比较之外的所有操作。只有在检测到拆分或在数组末尾时才会发生其他复杂的事情。

此外,很难判断您使用什么样的数据运行测试,所以这只是猜测。

public static unsafe Segment[] Split2(byte[] _src, byte value)
{
    var _ln = _src.Length;

    if (_ln == 0) return new Segment[] { };

    fixed (byte* src = _src)
    {
        var segments = new LinkedList<Segment>(); // Segment[c];

        byte* last = src;
        byte* end = src + _ln - 1;
        byte lastValue = *end;
        *end = value; // value-termination

        var cur = src;
        while (true)
        {
            if (*cur == value)
            {
                int begin = (int) (last - src);
                int length = (int) (cur - last + 1);
                segments.AddLast(new Segment(_src, begin, length));

                last = cur + 1;

                if (cur == end)
                {
                    if (lastValue != value)
                    {
                        *end = lastValue;
                    }
                    break;
                }
            }
            cur++;
        }

        return segments.ToArray();
    }
}

编辑:固定代码,因此它返回正确的结果。

于 2012-08-26T20:39:43.210 回答
3

对于 Split,在 32 位机器上处理 ulong 真的很慢,所以一定要减少到 uint。如果你真的想要ulong,实现两个版本,一个是32位的,一个是64位的。

您还应该衡量一次处理字节是否更快。

需要剖析内存分配的成本。如果它足够大,请尝试在多个调用中重用内存。

其他:

ToString:使用 "(" + Offset.ToString() + ", " + Length.ToString() + ")" 更快;

GetHashCode:尝试固定(字节 * b = & 缓冲区 [偏移量])


如果多次使用,这个版本应该非常快。关键点:内部数组扩展到合适大小后没有新的内存分配,最小的数据复制。

class ArraySplitter
{
    private byte[] m_data;
    private int    m_count;
    private int[]  m_stops;

    private void AddRange(int start, int stop)
    {
        // Skip empty range
        if (start > stop)
        {
            return;
        }

        // Grow array if needed
        if ((m_stops == null) || (m_stops.Length < (m_count + 2)))
        {
            int[] old = m_stops;

            m_stops = new int[m_count * 3 / 2 + 4];

            if (old != null)
            {
                old.CopyTo(m_stops, 0);
            }
        }

        m_stops[m_count++] = start;
        m_stops[m_count++] = stop;
    }

    public int Split(byte[] data, byte sep)
    {
        m_data  = data;
        m_count = 0;      // reuse m_stops

        int last = 0;

        for (int i = 0; i < data.Length; i ++)
        {
            if (data[i] == sep)
            {
                AddRange(last, i - 1);
                last = i + 1;
            }
        }

        AddRange(last, data.Length - 1);

        return m_count / 2;
    }

    public ArraySegment<byte> this[int index]
    {
        get
        {
            index *= 2;
            int start = m_stops[index];

            return new ArraySegment<byte>(m_data, start, m_stops[index + 1] - start + 1);
        }
    }
}

测试程序:

    static void Main(string[] args)
    {
        int count = 1000 * 1000;

        byte[] data = new byte[count];

        for (int i = 0; i < count; i++)
        {
            data[i] = (byte) i;
        }

        Stopwatch watch = new Stopwatch();

        for (int r = 0; r < 10; r++)
        {
            watch.Reset();
            watch.Start();

            int len = 0;

            foreach (var seg in data.MySplit(13))
            {
                len += seg.Count;
            }

            watch.Stop();

            Console.WriteLine("MySplit      : {0} {1,8:N3} ms", len, watch.Elapsed.TotalMilliseconds);

            watch.Reset();
            watch.Start();

            ArraySplitter splitter = new ArraySplitter();

            int parts = splitter.Split(data, 13);

            len = 0;

            for (int i = 0; i < parts; i++)
            {
                len += splitter[i].Count;
            }

            watch.Stop();
            Console.WriteLine("ArraySplitter: {0} {1,8:N3} ms", len, watch.Elapsed.TotalMilliseconds);
        }
    }

结果:

MySplit      : 996093    9.514 ms
ArraySplitter: 996093    4.754 ms
MySplit      : 996093    7.760 ms
ArraySplitter: 996093    2.710 ms
MySplit      : 996093    8.391 ms
ArraySplitter: 996093    3.510 ms
MySplit      : 996093    9.677 ms
ArraySplitter: 996093    3.468 ms
MySplit      : 996093    9.685 ms
ArraySplitter: 996093    3.370 ms
MySplit      : 996093    9.700 ms
ArraySplitter: 996093    3.425 ms
MySplit      : 996093    9.669 ms
ArraySplitter: 996093    3.519 ms
MySplit      : 996093    9.844 ms
ArraySplitter: 996093    3.416 ms
MySplit      : 996093    9.721 ms
ArraySplitter: 996093    3.685 ms
MySplit      : 996093    9.703 ms
ArraySplitter: 996093    3.470 ms
于 2012-08-26T19:58:23.203 回答
2

安东,

我不知道你是否还有兴趣优化这个,因为这个线程已经很老了,但是我看到你的代码在你的在线存储库中几乎是一样的,所以我想我会试一试。在评估您的 HgLab 应用程序时,我查看了您在 bitbucket.org 上的 HgSharp 代码。我使用本机构造重写了函数,大大简化了它。我的测试比你原来的程序节省了一半以上的时间。我通过加载一个几兆字节的源文件来测试它,并使用您的原始例程将时间与相同操作的性能进行比较。

除了重写基本逻辑之外,我决定使用ArraySegment<>框架内置的原生而不是您的自定义实现。唯一显着的区别是ArraySegment<>公开Count属性而不是Length属性。下面的代码不需要 unsafe 关键字,因为我没有使用指针,但是如果更改为这样做,似乎确实会有轻微的性能改进。


    public static ArraySegment<byte>[] SplitEx(this byte[] source, byte value) {
        var _previousIndex = -1;
        var _segments = new List<ArraySegment<byte>>();
        var _length = source.Length;

        if (_length > 0) {
            int _index;

            for (_index = 0; _index < _length; _index++) {
                var _value = source[_index];
                if (_value == value) {
                    _segments.Add(new ArraySegment<byte>(source, _previousIndex + 1, _index - _previousIndex));
                    _previousIndex = _index;
                }
            }

            if (--_index != _previousIndex) {
                _segments.Add(new ArraySegment<byte>(source, _previousIndex + 1, _index - _previousIndex));
            }
        }

        return _segments.ToArray();
    }
于 2012-12-18T01:01:04.000 回答