我正在开发一个分阶段的工作系统,目前工作如下(伪代码):
cyclicbarrier.init(numthreads)
on each thread:
for each stage s:
loop forever:
pop job from joblist[s]
if no job:
break
execute job
wait at cyclicbarrier
这会导致阶段中的所有作业在进入下一个阶段之前完成执行。
cyclicbarrier
使用两个信号量实现:
sem1(0)
sem2(0)
n = 0
function wait:
if atomic_incr(n) == maxthreads:
sem1.signal(maxthreads)
sem1.wait()
if atomic_decr(n) == 0:
sem2.signal(maxthreads)
sem2.wait()
我想添加对作业的支持,以便能够将 1 个或多个作业添加到当前运行的阶段,并让这些作业在阶段继续之前也执行。上面的代码将执行此操作,但是如果在线程到达屏障后添加此作业,那么它将以次优方式执行,因为等待线程可能正在执行它但不是。
我将问题归结为需要一种混合semaphore
/ cyclicbarrier
,为方便起见,我将其称为sembarrier
. 以下是它的功能:
function wait:
n--
if n < 0 and n > -maxthreads:
suspend until signaled
else if n <= -maxthreads:
signal(maxthreads)
本质上,这sembarrier
就像 a 一样工作semaphore
,但是当有足够多的线程在它上面等待时,它的行为就像 acyclicbarrier
并释放它们。
所以,这是使用sembarrier
.
sembarrier
构造函数看起来像:
sembarrier(initial_value, num_threads)
sembarrier
为每个阶段创建一个:
n = num_theads
for each stage s:
sembarrier[s].init(0, n)
这将是修改后的循环,使用sembarrier
数组:
on each thread:
for each stage s:
loop forever:
sembarrier[s].wait()
pop job from joblist[s]
if no job:
break;
execute job
此外,当添加到joblist
:
function AddJob(stage s, job j):
joblist[s].push(j)
sembarrier[s].signal()
问题:
1)我怎样才能有效地实施sembarrier
?我可以访问基本的并发结构,例如互斥体和信号量。我也有标准的原子操作。
2)我原来的问题有替代解决方案吗?
谢谢!