4

我需要一个非常快速的算法来完成以下任务。我已经实现了几种完成它的算法,但是它们对于我需要的性能来说都太慢了。它应该足够快,以至于算法可以在现代 CPU 上每秒至少运行 100,000 次。它将在 C++ 中实现。

我正在使用跨度/范围,这是一种在一条线上具有起点和终点坐标的结构。

我有两个跨度向量(动态数组),我需要合并它们。一个向量是 src,另一个是 dst。向量按跨度起始坐标排序,跨度在一个向量内不重叠。

src 向量中的 span 必须与 dst 向量中的 span 合并,这样得到的向量仍然是排序的并且没有重叠。IE。如果在合并期间检测到重叠,则将两个跨度合并为一个。(合并两个跨度只是改变结构中的坐标的问题。)

现在,还有一个问题,src 向量中的跨度必须在合并期间“加宽”。这意味着将在 src 中每个跨度的开始坐标添加一个常量,并将另一个(更大的)常量添加到结束坐标。这意味着在 src 跨度扩大后,它们可能会重叠。


到目前为止,我得出的结论是它不能完全就地完成,需要某种临时存储。我认为它应该在线性时间超过 src 和 dst 总和的元素数量。

任何临时存储都可能在算法的多次运行之间共享。

我尝试过的两种太慢的主要方法是:

  1. 将 src 的所有元素附加到 dst,在附加之前扩展每个元素。然后运行就地排序。最后,使用“读”和“写”指针遍历结果向量,读指针在写指针之前运行,在进行时合并跨度。当所有元素都被合并(读指针到达末尾) dst 被截断。

  2. 创建一个临时工作向量。通过重复从 src 或 dst 中选择下一个元素并合并到工作向量中来进行如上所述的简单合并。完成后,将工作向量复制到 dst 并替换它。

第一种方法的问题是排序是 O((m+n)*log(m+n)) 而不是 O(m+n) 并且有一些开销。这也意味着 dst 向量必须增长得比它真正需要的大得多。

第二个主要问题是大量复制并再次分配/释放内存。

如果您认为需要,可以更改用于存储/管理跨度/向量的数据结构。

更新:忘了说数据集有多大。最常见的情况是任一向量中有 4 到 30 个元素,并且 dst 为空或 src 和 dst 中的跨度之间存在大量重叠。

4

9 回答 9

8

我们知道绝对最佳情况的运行时间是 O(m+n),这是因为您至少必须扫描所有数据才能合并列表。鉴于此,您的第二种方法应该为您提供这种类型的行为。

你有没有分析过你的第二种方法来找出瓶颈是什么?很有可能,根据您所谈论的数据量,实际上不可能在指定的时间内做您想做的事情。验证这一点的一种方法是做一些简单的事情,比如在一个循环中总结每个向量中跨度的所有开始和结束值,然后计算时间。基本上,您在这里为向量中的每个元素做最少的工作。这将为您提供可以预期获得的最佳性能的基准。

除此之外,您可以避免使用 stl swap 方法逐个元素地复制向量,并且可以将临时向量预分配到一定大小,以避免在合并元素时触发数组的扩展。

您可能会考虑在您的系统中使用 2 个向量,并且每当您需要进行合并时,您会合并到未使用的向量中,然后交换(这类似于图形中使用的双缓冲)。这样您就不必在每次进行合并时重新分配向量。

但是,您最好先进行分析并找出瓶颈所在。如果与实际合并过程相比分配最小,那么您需要弄清楚如何使其更快。

一些可能的额外加速可能来自直接访问向量原始数据,这避免了每次访问数据时的边界检查。

于 2008-09-18T02:26:16.877 回答
0

我会始终保持我的跨度向量排序。这使得实现算法变得更加容易——并且可以在线性时间内完成。

好的,所以我会根据以下内容对跨度进行排序:

  • 跨度最小值按递增顺序
  • 然后按降序跨越最大值

您需要创建一个函数来执行此操作。

然后我会使用 std::set_union 来合并向量(你可以在继续之前合并多个)。

然后对于具有相同最小值的每组连续跨度,您保留第一个并删除其余的(它们是第一个跨度的子跨度)。

然后你需要合并你的跨度。这现在应该是相当可行的,并且在线性时间内是可行的。

好的,现在的诀窍就在这里。不要尝试就地执行此操作。使用一个或多个临时向量(并提前预留足够的空间)。然后在最后,调用 std::vector::swap 将结果放入您选择的输入向量中。

我希望这足以让你继续前进。

于 2008-09-18T04:01:33.450 回答
0

1 是正确的 - 完整排序比合并两个排序列表要慢。

所以你正在考虑调整 2(或全新的东西)。

如果将数据结构更改为双向链表,则可以将它们合并到恒定的工作空间中。

为列表节点使用固定大小的堆分配器,既可以减少每个节点的内存使用,又可以提高节点在内存中靠近的机会,从而减少页面未命中。

您也许可以在线或在您最喜欢的算法书中找到代码来优化链表合并。您需要对其进行自定义,以便在列表合并的同时进行跨度合并。

要优化合并,首先请注意,对于来自同一侧但没有来自另一侧的值的每次运行,您可以一次将整个运行插入到 dst 列表中,而不是依次插入每个节点。并且您可以在正常的列表操作中每次插入保存一次写入,方法是让末尾“悬空”,知道您稍后会对其进行修补。并且如果您不在应用程序的其他任何地方进行删除,则该列表可以是单链接的,这意味着每个节点一次写入。

至于 10 微秒的运行时间 - 取决于 n 和 m ...

于 2008-09-18T02:42:44.747 回答
0

如果您最近的实现仍然不够快,您可能最终不得不寻找替代方法。

你用这个函数的输出做什么?

于 2008-09-18T02:54:28.837 回答
0

没有重复分配的第二种方法怎么样 - 换句话说,分配你的临时向量一次,不再分配它?或者,如果输入向量足够小(但不是恒定大小),只需使用 alloca 而不是 malloc。

此外,就速度而言,您可能希望确保您的代码使用 CMOV 进行排序,因为如果代码实际上是为合并排序的每个迭代进行分支:

if(src1[x] < src2[x])
    dst[x] = src1[x];
else
    dst[x] = src2[x];

分支预测将有 50% 的时间失败,这将对性能产生巨大影响。有条件的移动可能会做得更好,因此请确保编译器正在这样做,如果没有,请尝试诱使它这样做。

于 2008-09-18T02:17:52.263 回答
0

您在方法 1 中提到的排序可以减少到线性时间(从您描述的对数线性),因为两个输入列表已经排序。只需执行合并排序的合并步骤。通过输入跨度向量的适当表示(例如单链表),这可以就地完成。

http://en.wikipedia.org/wiki/Merge_sort

于 2008-09-18T02:22:54.340 回答
0

我认为不可能有严格的线性解决方案,因为在最坏的情况下,扩大 src 向量跨度可能会导致它们全部重叠(取决于您添加的常数的大小)

问题可能出在实现上,而不是算法上;我建议为您之前的解决方案分析代码,以查看时间花在了哪里

推理:

对于像运行在 3.2GHz 的 Intel Core 2 Extreme QX9770 这样的真正“现代”CPU,可以预期大约 59,455 MIPS

对于 100,000 个向量,您必须在 594,550 个指令中处理每个向量。这是很多指令。

参考:维基百科 MIPS

另外,请注意,向 src 向量 spans 添加一个常数不会对它们进行解排序,因此您可以独立地对 src 向量 spans 进行归一化,然后将它们与 dst 向量 spans 合并;这应该会减少原始算法的工作量

于 2008-09-18T02:25:38.157 回答
0

我为这个算法编写了一个新的容器类,根据需要量身定制。这也让我有机会围绕我的程序调整其他代码,同时提高了一点速度。

这比使用 STL 向量的旧实现要快得多,但在其他方面基本相同。但是虽然它更快,但它仍然不够快......不幸的是。

分析不再揭示真正的瓶颈是什么。MSVC 分析器似乎有时会将“责任”归咎于错误的调用(假设相同的运行分配了大不相同的运行时间),并且大多数调用被合并成一个大裂缝。

查看生成的代码的反汇编表明生成的代码中有大量的跳转,我认为这可能是现在缓慢的主要原因。

class SpanBuffer {
private:
    int *data;
    size_t allocated_size;
    size_t count;

    inline void EnsureSpace()
    {
        if (count == allocated_size)
            Reserve(count*2);
    }

public:
    struct Span {
        int start, end;
    };

public:
    SpanBuffer()
        : data(0)
        , allocated_size(24)
        , count(0)
    {
        data = new int[allocated_size];
    }

    SpanBuffer(const SpanBuffer &src)
        : data(0)
        , allocated_size(src.allocated_size)
        , count(src.count)
    {
        data = new int[allocated_size];
        memcpy(data, src.data, sizeof(int)*count);
    }

    ~SpanBuffer()
    {
        delete [] data;
    }

    inline void AddIntersection(int x)
    {
        EnsureSpace();
        data[count++] = x;
    }

    inline void AddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);
        EnsureSpace();
        data[count] = s;
        data[count+1] = e;
        count += 2;
    }

    inline void Clear()
    {
        count = 0;
    }

    inline size_t GetCount() const
    {
        return count;
    }

    inline int GetIntersection(size_t i) const
    {
        return data[i];
    }

    inline const Span * GetSpanIteratorBegin() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data);
    }

    inline Span * GetSpanIteratorBegin()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data);
    }

    inline const Span * GetSpanIteratorEnd() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data+count);
    }

    inline Span * GetSpanIteratorEnd()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data+count);
    }

    inline void MergeOrAddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);

        if (count == 0)
        {
            AddSpan(s, e);
            return;
        }

        int *lastspan = data + count-2;

        if (s > lastspan[1])
        {
            AddSpan(s, e);
        }
        else
        {
            if (s < lastspan[0])
                lastspan[0] = s;
            if (e > lastspan[1])
                lastspan[1] = e;
        }
    }

    inline void Reserve(size_t minsize)
    {
        if (minsize <= allocated_size)
            return;

        int *newdata = new int[minsize];

        memcpy(newdata, data, sizeof(int)*count);

        delete [] data;
        data = newdata;

        allocated_size = minsize;
    }

    inline void SortIntersections()
    {
        assert((count & 1) == 0);
        std::sort(data, data+count, std::less<int>());
        assert((count & 1) == 0);
    }

    inline void Swap(SpanBuffer &other)
    {
        std::swap(data, other.data);
        std::swap(allocated_size, other.allocated_size);
        std::swap(count, other.count);
    }
};


struct ShapeWidener {
    // How much to widen in the X direction
    int widen_by;
    // Half of width difference of src and dst (width of the border being produced)
    int xofs;

    // Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call
    SpanBuffer buffer;

    inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst);

    ShapeWidener(int _xofs) : xofs(_xofs) { }
};


inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst)
{
    if (src.GetCount() == 0) return;
    if (src.GetCount() + dst.GetCount() == 0) return;

    assert((src.GetCount() & 1) == 0);
    assert((dst.GetCount() & 1) == 0);

    assert(buffer.GetCount() == 0);

    dst.Swap(buffer);

    const int widen_s = xofs - widen_by;
    const int widen_e = xofs + widen_by;

    size_t resta = src.GetCount()/2;
    size_t restb = buffer.GetCount()/2;
    const SpanBuffer::Span *spa = src.GetSpanIteratorBegin();
    const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin();

    while (resta > 0 || restb > 0)
    {
        if (restb == 0)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else if (resta == 0)
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
        else if (spa->start < spb->start)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
    }

    buffer.Clear();
}
于 2008-09-18T03:15:16.010 回答
0

你的目标系统是什么?是多核的吗?如果是这样你可以考虑多线程这个算法

于 2008-09-18T16:38:42.723 回答