11

我需要找到两个已排序整数数组的交集并快速完成。

现在,我正在使用以下代码:

int i = 0, j = 0;

while (i < arr1.Count && j < arr2.Count)
{
    if (arr1[i] < arr2[j])
    {
        i++;
    }
    else
    {
        if (arr2[j] < arr1[i])
        {
            j++;
        }
        else 
        {
            intersect.Add(arr2[j]);
            j++;
            i++;
        }
    }
}

不幸的是,完成所有工作可能需要几个小时。

如何更快地做到这一点?我发现这篇文章使用了 SIMD 指令。是否可以在 .NET 中使用 SIMD?

你有什么想法:

http://docs.go-mono.com/index.aspx?link=N:Mono.Simd Mono.SIMD

http://netasm.codeplex.com/ NetASM(将 asm 代码注入托管)

和类似http://www.atrevido.net/blog/PermaLink.aspx?guid=ac03f447-d487-45a6-8119-dc4fa1e932e1

 

编辑:

当我说数千时,我的意思是跟随(在代码​​中)

for(var i=0;i<arrCollection1.Count-1;i++)
{
    for(var j=i+1;j<arrCollection2.Count;j++)
    {
        Intersect(arrCollection1[i],arrCollection2[j])  
    }
}
4

5 回答 5

10

更新

我得到的最快的是 200 毫秒,数组大小为 10 百万,不安全版本(最后一段代码)。

我做过的测试:

var arr1 = new int[10000000];
var arr2 = new int[10000000];

for (var i = 0; i < 10000000; i++)
{
    arr1[i] = i;
    arr2[i] = i * 2;
}

var sw = Stopwatch.StartNew();

var result = arr1.IntersectSorted(arr2);

sw.Stop();

Console.WriteLine(sw.Elapsed); // 00:00:00.1926156

全文:

我已经测试了各种方法来做到这一点,发现这非常好:

public static List<int> IntersectSorted(this int[] source, int[] target)
{
    // Set initial capacity to a "full-intersection" size
    // This prevents multiple re-allocations
    var ints = new List<int>(Math.Min(source.Length, target.Length));

    var i = 0;
    var j = 0;

    while (i < source.Length && j < target.Length)
    {
        // Compare only once and let compiler optimize the switch-case
        switch (source[i].CompareTo(target[j]))
        {
            case -1:
                i++;

                // Saves us a JMP instruction
                continue;
            case 1:
                j++;

                // Saves us a JMP instruction
                continue;
            default:
                ints.Add(source[i++]);
                j++;

                // Saves us a JMP instruction
                continue;
        }
    }

    // Free unused memory (sets capacity to actual count)
    ints.TrimExcess();

    return ints;
}

为了进一步改进,您可以删除ints.TrimExcess();,这也会产生很大的不同,但您应该考虑是否需要该内存。

此外,如果您知道可能会中断使用交叉点的循环,并且您不必将结果作为数组/列表,您应该将实现更改为迭代器:

public static IEnumerable<int> IntersectSorted(this int[] source, int[] target)
{
    var i = 0;
    var j = 0;

    while (i < source.Length && j < target.Length)
    {
        // Compare only once and let compiler optimize the switch-case
        switch (source[i].CompareTo(target[j]))
        {
            case -1:
                i++;

                // Saves us a JMP instruction
                continue;
            case 1:
                j++;

                // Saves us a JMP instruction
                continue;
            default:
                yield return source[i++];
                j++;

                // Saves us a JMP instruction
                continue;
        }
    }
}

另一个改进是使用不安全的代码:

public static unsafe List<int> IntersectSorted(this int[] source, int[] target)
{
    var ints = new List<int>(Math.Min(source.Length, target.Length));

    fixed (int* ptSrc = source)
    {
        var maxSrcAdr = ptSrc + source.Length;

        fixed (int* ptTar = target)
        {
            var maxTarAdr = ptTar + target.Length;

            var currSrc = ptSrc;
            var currTar = ptTar;

            while (currSrc < maxSrcAdr && currTar < maxTarAdr)
            {
                switch ((*currSrc).CompareTo(*currTar))
                {
                    case -1:
                        currSrc++;
                        continue;
                    case 1:
                        currTar++;
                        continue;
                    default:
                        ints.Add(*currSrc);
                        currSrc++;
                        currTar++;
                        continue;
                }
            }
        }
    }

    ints.TrimExcess();
    return ints;
}

总之,对性能影响最大的是 if-else。把它变成一个开关盒有很大的不同(大约快 2 倍)。

于 2012-06-03T00:21:27.340 回答
2

你有没有尝试过这样简单的事情:

var a = Enumerable.Range(1, int.MaxValue/100).ToList();
var b = Enumerable.Range(50, int.MaxValue/100 - 50).ToList();

//var c = a.Intersect(b).ToList();
List<int> c = new List<int>();

var t1 = DateTime.Now;

foreach (var item in a)
{
    if (b.BinarySearch(item) >= 0)
        c.Add(item);
}

var t2 = DateTime.Now;

var tres = t2 - t1;

这段代码需要 1 个包含21,474,836 个元素的数组,另一个包含21,474,786个元素

如果我使用var c = a.Intersect(b).ToList();我会得到一个OutOfMemoryException

结果产品将是使用嵌套 foreach的461,167,507,485,096次迭代

但是使用这个简单的代码,交集发生在TotalSeconds = 7.3960529(使用一个核心)

现在我仍然不开心,所以我试图通过并行打破它来提高性能,一旦我完成我会发布它

于 2012-06-03T00:48:02.070 回答
1

Yorye Nathan 给了我两个数组与最后一个“不安全代码”方法的最快交集。不幸的是,它对我来说仍然太慢了,我需要组合数组交叉点,最多 2^32 个组合,几乎没有?我进行了以下修改和调整,时间缩短到 2.6 倍。您需要在之前进行一些预优化,以确保您可以以某种方式做到这一点。我只使用索引而不是实际的对象或 ID 或其他一些抽象比较。因此,例如,如果您必须像这样与大数字相交

Arr1: 103344、234566、789900、1947890、 Arr2: 150034、234566、845465、23849854

将所有内容放入数组中

Arr1: 103344、234566、789900、1947890、150034、845465、23849854

并使用结果数组的有序索引作为交集

Arr1Index: 0、1、2、3 Arr2Index : 1、4、5、6

现在我们有了更小的数字,我们可以用它们来构建其他一些不错的数组。在从 Yorye 获取方法之后,我使用 Arr2Index 并将其扩展为理论上的布尔数组,实际上是字节数组,因为内存大小的含义,如下:

Arr2IndexCheck: 0, 1, 0, 0, 1, 1 ,1

这或多或少是一本字典,它告诉我第二个数组是否包含任何索引。下一步我没有使用同样需要时间的内存分配,而是在调用方法之前预先创建了结果数组,因此在查找我的组合的过程中我从不实例化任何东西。当然你必须单独处理这个数组的长度,所以也许你需要把它存储在某个地方。

最后代码如下所示:

    public static unsafe int IntersectSorted2(int[] arr1, byte[] arr2Check, int[] result)
    {
        int length;

        fixed (int* pArr1 = arr1, pResult = result)
        fixed (byte* pArr2Check = arr2Check)
        {
            int* maxArr1Adr = pArr1 + arr1.Length;
            int* arr1Value = pArr1;
            int* resultValue = pResult;

            while (arr1Value < maxArr1Adr)
            {
                if (*(pArr2Check + *arr1Value) == 1)
                {
                    *resultValue = *arr1Value;
                    resultValue++;
                }

                arr1Value++;
            }

            length = (int)(resultValue - pResult);
        }

        return length;
    }

你可以看到函数返回的结果数组大小,然后你做你想做的事情(调整它的大小,保留它)。显然,结果数组必须至少具有 arr1 和 arr2 的最小大小。

最大的改进是我只遍历第一个数组,最好是比第二个数组的大小更小,这样你的迭代次数就会更少。更少的迭代意味着更少的 CPU 周期,对吗?

So here is the really fast intersection of two ordered arrays, that if you need a reaaaaalllyy high performance ;).

于 2015-10-20T14:27:52.210 回答
0

是整数数组arrCollection1arrCollection2集合吗?在这种情况下,您应该通过开始第二个循环i+1而不是0

于 2012-06-03T00:15:51.417 回答
0

C# 不支持 SIMD。此外,我还没有弄清楚为什么,使用 SSE 的 DLL 在从 C# 调用时并不比非 SSE 等效函数快。此外,我所知道的所有 SIMD 扩展都不适用于分支,即您的“if”语句。

如果您使用的是 .net 4.0,如果您有多个内核,则可以使用 Parallel For 来提高速度。否则,如果您拥有 .net 3.5 或更低版本,则可以编写多线程版本。

这是一个类似于你的方法:

    IList<int> intersect(int[] arr1, int[] arr2)
    {
        IList<int> intersect = new List<int>();
        int i = 0, j = 0;
        int iMax = arr1.Length - 1, jMax = arr2.Length - 1;
        while (i < iMax && j < jMax)
        {
            while (i < iMax && arr1[i] < arr2[j]) i++;
            if (arr1[i] == arr2[j]) intersect.Add(arr1[i]);
            while (i < iMax && arr1[i] == arr2[j]) i++; //prevent reduntant entries
            while (j < jMax && arr2[j] < arr1[i]) j++;
            if (arr1[i] == arr2[j]) intersect.Add(arr1[i]);
            while (j < jMax && arr2[j] == arr1[i]) j++; //prevent redundant entries
        }
        return intersect;
    }

这还可以防止任何条目出现两次。使用 2 个大小为 1000 万的排序数组,它在大约一秒钟内完成。如果您在 For 语句中使用 array.Length ,编译器应该删除数组边界检查,但我不知道这是否适用于 while 语句。

于 2012-06-03T00:17:27.593 回答