2008-09-18 102 views
3

我需要一个非常快速的算法来完成以下任务。我已经实现了几种完成它的算法,但它们对于我所需要的性能来说太慢了。它应该足够快,可以在现代CPU上运行至少10万次。它将用C++实现。帮助合并矢量的算法

我正在使用跨度/范围,一个结构有一个开始和一个结束坐标。

我有两个跨度的向量(动态数组),我需要合并它们。一个矢量是src,另一个是dst。矢量按跨度起点坐标排序,跨度在一个矢量内不重叠。

src向量中的跨度必须与dst向量中的跨度合并,使得生成的向量仍然排序并且没有重叠。 IE浏览器。如果在合并期间检测到重叠,则将两个跨度合并为一个。 (合并两个跨度只是改变结构中的坐标的问题。)

现在,还有一个catch,src向量中的跨度必须在合并期间“扩大”。这意味着一个常量将被添加到开始,另一个(更大的)常量将被添加到src中每个跨度的结束坐标。这意味着src跨度扩大后可能会重叠。


我到目前为止所做的是,它不能完全就地完成,需要某种临时存储。我认为在src和dst总结的元素数量的线性时间内应该是可行的。

任何临时存储都可能在算法的多次运行之间共享。

我曾尝试这两种主要方法,其是太慢,有:

  1. 追加的src到dst的所有元素,它追加之前加宽每个元素。然后运行就地排序。最后,使用“read”和“write”指针遍历结果向量,读指针在写指针前面运行,并在跨越时合并跨度。当所有元素都被合并(读指针到达结尾)时,dst被截断。

  2. 创建临时工作向量。通过反复从src或dst中选择下一个元素并合并到工作向量中,如上所述进行天真合并。完成后,将工作向量复制到dst,替换它。

第一种方法具有排序为O((M + N)*日志(M + N)),而不是O(M + N),并具有一定程度的开销的问题。这也意味着dst矢量必须增长得比实际需要的大得多。

第二个问题是大量复制和重新分配/释放内存的主要问题。

如果您认为需要,可以更改用于存储/管理跨度/矢量的数据结构。

更新:忘了说数据集有多大。最常见的情况是在任何一个向量中有4到30个元素,并且dst是空的,或者src和dst中跨度之间有大量重叠。

回答

0

第二种方法如何不重复分配 - 换句话说,分配您的临时向量一次,而不再分配它?或者,如果输入向量足够小(但不是常量大小),只需使用alloca而不是malloc。

此外,在速度方面,你可能想确保你的代码是使用CMOV的排序,因为如果代码实际上分支的归并的每一个迭代:

if(src1[x] < src2[x]) 
    dst[x] = src1[x]; 
else 
    dst[x] = src2[x]; 

的分支预测将在50%的时间内失败,这将对性能产生巨大影响。有条件的移动可能会好得多,所以确保编译器正在这样做,如果没有,尝试哄骗它这样做。

0

您在方法1中提到的排序可以缩减为线性时间(从描述它的对数线性算起),因为两个输入列表已经排序。只需执行合并排序的合并步骤。用适当的输入范围向量表示(例如单向链表),可以在原地完成。

http://en.wikipedia.org/wiki/Merge_sort

0

我不认为严格的线性解决方案是可行的,因为扩大SRC载体跨度可能在最坏的情况下导致他们都重叠(取决于常数的大小,你是增加)

这个问题可能在执行中,不在算法中;我建议剖析代码为您的解决方案之前,看看时间都花在

推理:

对于像英特尔的Core 2 Extreme QX9770真正的“现代”的CPU为3.2GHz运行,可以预计约59,455 MIPS

对于100,000个矢量,您必须在594,550个指令中处理每个矢量。这是很多指示。

裁判:wikipedia MIPS

除了

,注意添加常数到src矢量跨度不会去对它们进行排序,这样你就可以恢复正常的SRC矢量独立跨越,再与DST矢量跨越合并它们;这应该可以减少原始算法的工作量

+0

100k实际上可能是一个下冲,我还没有真正计算的数字。当我说“现代”的CPU时,我实际上在想“5年前的东西”,Athlon XP 3000+并不是不切实际的目标。 – jfs 2008-09-18 02:38:24

8

我们知道绝对最佳运行时运行时间是O(m + n),这是因为您至少必须扫描所有数据以便能够合并列表。鉴于此,你的第二种方法应该给你这种类型的行为。

你有没有介绍你的第二种方法来找出瓶颈?根据您所谈论的数据量,很可能在指定的时间内完成您想要的操作实际上是不可能的。验证这一点的一种方法是做一些简单的事情,比如总结循环中每个向量中跨度的所有开始和结束值以及时间。基本上,这里你正在为向量中的每个元素做最少量的工作。这将为您提供一个可获得最佳性能的基准。

除此之外,您可以避免通过使用stl swap方法将元素按元素复制,并且可以将temp矢量预先分配给一定的大小,以避免在合并元素时触发数组的扩展。

你可以考虑在你的系统中使用2个载体,每当你需要做的,你合并到使用的向量合并,然后交换(这类似于在图形中使用双缓冲)。这样,每次进行合并时都不必重新分配向量。

但是,您最好先分析并找出自己的瓶颈。如果分配与实际合并过程相比是最小的,则需要找出如何更快地完成分配。

一些可能的附加加速可能来自直接访问向量原始数据,这可以避免每次访问数据时的边界检查。

+0

感谢提醒我std :: swap,它可能实际上是交易断路器。我会回来后测试了这一点;) – jfs 2008-09-18 02:32:47

0

1是对的 - 完整排序比合并两个排序列表慢。

所以你看着调整2(或全新的东西)。

如果将数据结构更改为双向链接列表,则可以将它们合并到恒定的工作空间中。

对列表节点使用固定大小的堆分配器,以减少每个节点的内存使用量并提高节点在内存中靠近在一起的几率,从而减少页面丢失。

您可能能够在线或在您最喜欢的算法书中查找代码以优化链接列表合并。您需要对此进行自定义,以便在列表合并的同时进行跨度合并。要优化合并,首先要注意的是,对于每个运行的值来自同一侧而没有来自另一侧的值,您可以一次将整个运行插入到dst列表中,而不是将每个节点插入转。并且,您可以将每个插入的写入操作保存在正常的列表操作中,只需将结尾保留为“悬挂”即可,并知道您将在稍后对其进行修补。并且假设您不会在应用程序的其他任何位置执行删除操作,则列表可以单链接,这意味着每个节点只写一个。

至于10微秒运行时 - 那种取决于n和m ...

0

如果您最近实施仍是不够快,你可能最终不得不看的替代方法。

你使用这个函数的输出是什么?

+0

那么你错过了一件事,不只是一个'三角洲',有两个。跨度的左侧和右侧添加了不同的值,特别是右侧的值比左侧的值大。 – jfs 2008-09-18 03:06:30

0

我为这个算法编写了一个新的容器类,根据需要量身定制。这也给了我一个机会来调整我的程序周围的其他代码,同时有一点速度提升。

这比使用STL向量的旧实现速度快得多,但它基本上是相同的东西。但尽管速度更快,但速度还不够快......不幸的是。

分析并不能揭示什么是真正的瓶颈。该MSVC探查似乎有时放在了错误的所谓的“怪”(据说相同运行中广泛分配不同的运行时间)和大多数呼叫越来越合并成一个大的缝隙。

纵观生成的代码的反汇编显示,有一个非常大的量生成的代码跳跃的,我认为这可能是缓慢的主要原因吧。

class SpanBuffer { 
private: 
    int *data; 
    size_t allocated_size; 
    size_t count; 

    inline void EnsureSpace() 
    { 
     if (count == allocated_size) 
      Reserve(count*2); 
    } 

public: 
    struct Span { 
     int start, end; 
    }; 

public: 
    SpanBuffer() 
     : data(0) 
     , allocated_size(24) 
     , count(0) 
    { 
     data = new int[allocated_size]; 
    } 

    SpanBuffer(const SpanBuffer &src) 
     : data(0) 
     , allocated_size(src.allocated_size) 
     , count(src.count) 
    { 
     data = new int[allocated_size]; 
     memcpy(data, src.data, sizeof(int)*count); 
    } 

    ~SpanBuffer() 
    { 
     delete [] data; 
    } 

    inline void AddIntersection(int x) 
    { 
     EnsureSpace(); 
     data[count++] = x; 
    } 

    inline void AddSpan(int s, int e) 
    { 
     assert((count & 1) == 0); 
     assert(s >= 0); 
     assert(e >= 0); 
     EnsureSpace(); 
     data[count] = s; 
     data[count+1] = e; 
     count += 2; 
    } 

    inline void Clear() 
    { 
     count = 0; 
    } 

    inline size_t GetCount() const 
    { 
     return count; 
    } 

    inline int GetIntersection(size_t i) const 
    { 
     return data[i]; 
    } 

    inline const Span * GetSpanIteratorBegin() const 
    { 
     assert((count & 1) == 0); 
     return reinterpret_cast<const Span *>(data); 
    } 

    inline Span * GetSpanIteratorBegin() 
    { 
     assert((count & 1) == 0); 
     return reinterpret_cast<Span *>(data); 
    } 

    inline const Span * GetSpanIteratorEnd() const 
    { 
     assert((count & 1) == 0); 
     return reinterpret_cast<const Span *>(data+count); 
    } 

    inline Span * GetSpanIteratorEnd() 
    { 
     assert((count & 1) == 0); 
     return reinterpret_cast<Span *>(data+count); 
    } 

    inline void MergeOrAddSpan(int s, int e) 
    { 
     assert((count & 1) == 0); 
     assert(s >= 0); 
     assert(e >= 0); 

     if (count == 0) 
     { 
      AddSpan(s, e); 
      return; 
     } 

     int *lastspan = data + count-2; 

     if (s > lastspan[1]) 
     { 
      AddSpan(s, e); 
     } 
     else 
     { 
      if (s < lastspan[0]) 
       lastspan[0] = s; 
      if (e > lastspan[1]) 
       lastspan[1] = e; 
     } 
    } 

    inline void Reserve(size_t minsize) 
    { 
     if (minsize <= allocated_size) 
      return; 

     int *newdata = new int[minsize]; 

     memcpy(newdata, data, sizeof(int)*count); 

     delete [] data; 
     data = newdata; 

     allocated_size = minsize; 
    } 

    inline void SortIntersections() 
    { 
     assert((count & 1) == 0); 
     std::sort(data, data+count, std::less<int>()); 
     assert((count & 1) == 0); 
    } 

    inline void Swap(SpanBuffer &other) 
    { 
     std::swap(data, other.data); 
     std::swap(allocated_size, other.allocated_size); 
     std::swap(count, other.count); 
    } 
}; 


struct ShapeWidener { 
    // How much to widen in the X direction 
    int widen_by; 
    // Half of width difference of src and dst (width of the border being produced) 
    int xofs; 

    // Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call 
    SpanBuffer buffer; 

    inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst); 

    ShapeWidener(int _xofs) : xofs(_xofs) { } 
}; 


inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst) 
{ 
    if (src.GetCount() == 0) return; 
    if (src.GetCount() + dst.GetCount() == 0) return; 

    assert((src.GetCount() & 1) == 0); 
    assert((dst.GetCount() & 1) == 0); 

    assert(buffer.GetCount() == 0); 

    dst.Swap(buffer); 

    const int widen_s = xofs - widen_by; 
    const int widen_e = xofs + widen_by; 

    size_t resta = src.GetCount()/2; 
    size_t restb = buffer.GetCount()/2; 
    const SpanBuffer::Span *spa = src.GetSpanIteratorBegin(); 
    const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin(); 

    while (resta > 0 || restb > 0) 
    { 
     if (restb == 0) 
     { 
      dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e); 
      --resta, ++spa; 
     } 
     else if (resta == 0) 
     { 
      dst.MergeOrAddSpan(spb->start, spb->end); 
      --restb, ++spb; 
     } 
     else if (spa->start < spb->start) 
     { 
      dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e); 
      --resta, ++spa; 
     } 
     else 
     { 
      dst.MergeOrAddSpan(spb->start, spb->end); 
      --restb, ++spb; 
     } 
    } 

    buffer.Clear(); 
} 
+0

尝试使用deque而不是vector。 Deque的内存分配较少,但不以连续的内存为代价。 – moswald 2008-09-18 06:28:22

0

我会始终保持我的向量跨度排序。这使得实现算法更容易 - 并且可能在线性时间内完成。

  • 跨度最小递增的顺序
  • 然后跨越最大的递减顺序

您需要创建一个函数来做到:

OK,所以我跨度基于排序那。

然后,我会使用std :: set_union来合并向量(您可以在继续之前合并多个向量)。

然后对于每个连续的套具有相同的最低跨度,你保持第一,并删除其余的(他们是第一跨度的子跨度)。

然后你需要合并你的跨度。在线性时间内,这应该是现在和可行的。

好的,现在就是这个把戏。不要试图做到这一点。使用一个或多个临时向量(并提前预留足够的空间)。然后在最后,调用std :: vector :: swap将结果放入您选择的输入向量中。

我希望这足以让你走。

0

你的目标系统是什么?它是多核?如果是这样,你可以考虑使用多线程算法

+0

我的目标系统是过去5年左右的桌面系统,我不能假设任何有关SMP支持或SIMD指令集的内容。 (我可以假设x86上有MMX,但就是这样。) – jfs 2008-09-18 20:58:48