10

我正在用 C# 进行流体模拟。每个循环我都需要计算空间中离散点的流体速度。作为该计算的一部分,我需要几十 KB 的暂存空间来保存一些 double[] 数组(数组的确切大小取决于一些输入数据)。数组仅在使用它们的方法期间才需要,并且有一些不同的方法需要像这样的暂存空间。

如我所见,构建临时数组有几种不同的解决方案:

  1. 每次调用该方法时,使用“new”从堆中获取内存。这就是我一开始在做的事情,但是它给垃圾收集器带来了很大的压力,而且每秒一到两次的几毫秒的峰值真的很烦人。

  2. 调用方法时将临时数组作为参数传递。问题是这迫使用户管理它们,包括适当地调整它们的大小,这是一个巨大的痛苦。由于它更改了 API,因此使用或多或少的暂存内存变得困难。

  3. 在不安全的上下文中使用 stackalloc 从程序堆栈中分配暂存内存。这可以正常工作,除了我需要使用 /unsafe 编译并在我的代码中不断地撒上不安全的块,我想避免这种情况。

  4. 程序启动时预分配私有数组一次。这很好,除非我不一定知道我需要的数组的大小,直到我可以查看一些输入数据。而且它变得非常混乱,因为你不能将这些私有变量的范围限制在一个方法中,所以它们会不断地污染命名空间。随着需要暂存内存的方法数量的增加,它的扩展性很差,因为我分配了很多只在一小部分时间内使用的内存。

  5. 创建某种中央池,并从池中分配暂存内存数组。这样做的主要问题是我看不到从中央池分配动态大小的数组的简单方法。我可以使用起始偏移量和长度,并让所有暂存内存本质上共享一个大数组,但我有很多现有代码假定为 double[]s。而且我必须小心使这样的池线程安全。

...

有没有人有类似问题的经验?从经验中可以提供任何建议/教训?

4

2 回答 2

7

我同情你的处境;当我在 Roslyn 工作时,我们非常仔细地考虑了由分配临时工作数组的收集压力引起的潜在性能问题。我们确定的解决方案是池化策略。

在编译器中,数组大小往往很小,因此经常重复。在您的情况下,如果您有大型阵列,那么我会遵循 Tom 的建议:简化管理问题并浪费一些空间。当您向池请求大小为 x 的数组时,将 x 向上舍入到最接近的 2 的幂并分配该大小的数组,或者从池中取出一个。调用者得到一个有点太大的数组,但可以编写它们来处理它。在池中搜索适当大小的数组应该不会太难。或者您可以维护一堆池,一个用于 1024 大小的数组,一个用于 2048,依此类推。

编写线程安全池并不太难,或者您可以将池线程设为静态,每个线程拥有一个池。

棘手的一点是将内存恢复到池中。有几种方法可以解决这个问题。首先,如果他们不想承担收集压力的代价,您可以简单地要求池化内存的用户在完成数组处理后调用“返回池中”方法。

另一种方法是在数组周围编写一个外观包装器,使其实现 IDisposable 以便您可以使用“使用”(*),并在将对象放回池中的对象上进行终结器,使其复活。(确保让终结器重新打开“我需要被终结”位。)复活的终结器让我紧张;我个人更喜欢前一种方法,这就是我们在罗斯林所做的。


(*) 是的,这违反了“使用”应该表明将非托管资源返回给操作系统的原则。本质上,我们通过自己的管理将托管内存视为非托管资源,因此还不错。

于 2013-03-31T14:53:09.990 回答
3

您可以将使用这些临时数组的代码包装在 using 语句中,如下所示:

using(double[] scratchArray = new double[buffer])
{
    // Code here...
}

这将通过在 using 语句末尾调用析构函数来显式释放内存。

不幸的是,上面的内容似乎不正确!取而代之的是,您可以尝试使用帮助函数返回适当大小的数组(大于大小的 2 的最接近幂),如果它不存在,则创建它。这样,您将只有对数个数组。如果您希望它是线程安全的,尽管您需要遇到更多麻烦。

它可能看起来像这样:(使用算法中的 pow2roundup 来查找大于或等于给定值的 2 的最小幂

private static Dictionary<int,double[]> scratchArrays = new Dictionary<int,double[]>();
/// Round up to next higher power of 2 (return x if it's already a power of 2).
public static int Pow2RoundUp (int x)
{
    if (x < 0)
        return 0;
    --x;
    x |= x >> 1;
    x |= x >> 2;
    x |= x >> 4;
    x |= x >> 8;
    x |= x >> 16;
    return x+1;
}
private static double[] GetScratchArray(int size)
{
    int pow2 = Pow2RoundUp(size);
    if (!scratchArrays.ContainsKey(pow2))
    {
        scratchArrays.Add(pow2, new double[pow2]);
    }
    return scratchArrays[pow2];
}

编辑:线程安全版本:这仍然会有垃圾收集的东西,但它将是特定于线程的,并且开销应该少得多。

[ThreadStatic]
private static Dictionary<int,double[]> _scratchArrays;

private static Dictionary<int,double[]> scratchArrays
{
    get
    {
        if (_scratchArrays == null)
        {
            _scratchArrays = new Dictionary<int,double[]>();
        }
        return _scratchArrays;
    }
}

/// Round up to next higher power of 2 (return x if it's already a power of 2).
public static int Pow2RoundUp (int x)
{
    if (x < 0)
        return 0;
    --x;
    x |= x >> 1;
    x |= x >> 2;
    x |= x >> 4;
    x |= x >> 8;
    x |= x >> 16;
    return x+1;
}
private static double[] GetScratchArray(int size)
{
    int pow2 = Pow2RoundUp(size);
    if (!scratchArrays.ContainsKey(pow2))
    {
        scratchArrays.Add(pow2, new double[pow2]);
    }
    return scratchArrays[pow2];
}
于 2013-03-31T04:21:29.587 回答