我不完全确定这是正确的,但我想我会把它扔掉以征求意见。首先,让我介绍一种无锁不相交集算法,它将构成我提出的算法的重要组成部分。
无锁不相交集算法
我假设在您选择的 CPU 架构上存在两个指针大小的比较和交换操作。这至少在 x86 和 x64 架构上可用。
该算法与 Wikipedia 页面上描述的单线程案例大致相同,但为了安全无锁操作进行了一些修改。首先,我们要求 rank 和 parent 元素都是指针大小的,并且在内存中与 2*sizeof(pointer) 对齐,以便稍后用于原子 CAS。
Find() 不需要改变;最坏的情况是路径压缩优化在同时存在写入者的情况下无法充分发挥作用。
然而, Union() 必须改变:
function Union(x, y)
redo:
x = Find(x)
y = Find(y)
if x == y
return
xSnap = AtomicRead(x) -- read both rank and pointer atomically
ySnap = AtomicRead(y) -- this operation may be done using a CAS
if (xSnap.parent != x || ySnap.parent != y)
goto redo
-- Ensure x has lower rank (meaning y will be the new root)
if (xSnap.rank > ySnap.rank)
swap(xSnap, ySnap)
swap(x, y)
-- if same rank, use pointer value as a fallback sort
else if (xSnap.rank == ySnap.rank && x > y)
swap(xSnap, ySnap)
swap(x, y)
yNew = ySnap
yNew.rank = max(yNew.rank, xSnap.rank + 1)
xNew = xSnap
xNew.parent = y
if (!CAS(y, ySnap, yNew))
goto redo
if (!CAS(x, xSnap, xNew))
goto redo
return
这应该是安全的,因为它永远不会形成循环,并且总是会导致正确的联合。我们可以通过观察来确认这一点:
- 首先,在终止之前,两个根中的一个总是以指向另一个的父级结束。因此,只要没有循环,合并就成功了。
- 其次,排名总是在增加。在比较 x 和 y 的顺序后,我们知道 x 在快照时的排名低于 y。为了形成一个循环,另一个线程需要首先增加 x 的等级,然后合并 x 和 y。但是在写入 x 的父指针的 CAS 中,我们检查 rank 没有改变;因此,y 的等级必须保持大于 x。
在同时发生突变的情况下,可能会增加 y 的等级,然后由于冲突而返回重做。但是,这意味着 y 不再是根(在这种情况下等级无关紧要),或者 y 的等级已被另一个过程增加(在这种情况下,第二次循环将没有效果并且 y 将具有正确的等级)。
因此,应该没有形成循环的机会,并且这种无锁不相交集算法应该是安全的。
现在开始解决您的问题...
假设
我假设山脊段只能在它们的端点处相交。如果不是这种情况,您将需要以某种方式更改阶段 1。
我还假设单个整数像素位置的共存足以连接山脊段。如果不是,您将需要在阶段 1 中更改数组以保存多个候选脊段 + 不相交集对,并过滤以找到实际连接的那些。
该算法中使用的不相交集合结构应在其结构中携带对线段的引用。在合并的情况下,我们任意选择两个记录片段中的一个来表示集合。
第一阶段:本地线路识别
我们首先将地图划分为多个扇区,每个扇区将作为单独的作业进行处理。多个作业可能在不同的线程中处理,但每个作业将仅由一个线程处理。如果一个山脊段穿过一个扇区,它将被分成两段,每个扇区一个。
对于每个扇区,建立一个将像素位置映射到不相交集结构的阵列。这个数组大部分会在以后被丢弃,所以它的内存需求不应该是太大的负担。
然后我们继续遍历扇区中的每个线段。我们首先选择一个不相交的集合来表示该线段构成的整条线。我们首先在像素位置数组中查找每个端点,看看是否已经分配了不相交的集合结构。如果端点之一已经在这个数组中,我们使用分配的不相交集。如果两者都在数组中,我们对不相交的集合执行合并,并使用新的根作为我们的集合。否则,我们创建一个新的不相交集,并将对当前线段的引用与不相交集结构相关联。然后,我们将每个端点的新不相交集的根写回像素位置数组。
对扇区中的每个线段重复此过程;到最后,我们将通过一个不相交的集合完全识别扇区内的所有线。
请注意,由于不相交的集合尚未在线程之间共享,因此还没有必要使用比较和交换操作;只需使用普通的单线程联合合并算法。由于在算法完成之前我们不会释放任何不相交的集合结构,因此也可以从每个线程的凹凸分配器进行分配,从而使内存分配(实际上)无锁且 O(1)。
一旦一个扇区被完全处理,像素位置数组中的所有数据都将被丢弃;然而,对应于扇区边缘像素的数据被复制到一个新的数组中,并为下一阶段保留。
由于迭代整个图像是 O(x*y),并且不相交合并实际上是 O(1),所以这个操作是 O(x*y) 并且需要工作内存 O(m+2*x*y/k+ k^2) = O(x*y/k+k^2),其中t为扇区数,k为扇区宽度,m为扇区内部分线段数(取决于如何经常有线条跨越边界,m可能会有很大的变化,但永远不会超过线段的数量)。转移到下一个操作的内存是 O(m + 2*x*y/k) = O(x*y/k)
第 2 阶段:跨行业合并
处理完所有扇区后,我们就开始合并跨扇区的行。对于扇区之间的每个边界,我们对跨越边界的线执行无锁合并操作(即,边界每侧的相邻像素已分配给线集)。
此操作的运行时间为 O(x+y) 并消耗 O(1) 内存(但是我们必须保留阶段 1 的内存)。完成后,可以丢弃边缘阵列。
第 3 阶段:收集线路
我们现在对所有分配的不相交集结构对象执行多线程映射操作。我们首先跳过任何不是根的对象(即,obj.parent != obj)。然后,从代表线段开始,我们从那里移出并收集和记录有关该线的任何所需信息。我们确信一次只有一个线程在查看任何给定的行,因为相交的行最终会以相同的不相交集结构结束。
这具有 O(m) 的运行时间和内存使用情况,具体取决于您需要收集有关这些线段的哪些信息。
概括
总的来说,这个算法应该有 O(x*y) 的运行时间和 O(x*y/k + k^2) 的内存使用。调整 k 可以在阶段 1 进程的瞬态内存使用与延续到阶段 2 的邻接数组和不相交集结构的长期内存使用之间进行权衡。
请注意,我还没有在现实世界中实际测试过这个算法的性能;也有可能我在上面的无锁不相交集联合合并算法中忽略了并发问题。欢迎评论:)