8

我有代码,其中单个 goroutine 将触发不确定数量的子 goroutine,这反过来又会触发更多 goroutine,等等。我的目标是等待所有子 goroutine 完成。

我不知道我将提前触发的 goroutine 的总数,所以我不能使用sync.WaitGroup,理想情况下我不必人为地限制通过channel-as-semaphore模式运行的 goroutine 的总数.

简而言之,我想在每个 goroutine 中都有一个本地通道或等待组,作为一个信号量来等待它的所有子进程,但这会导致每个 goroutine 在其所有后代完成时都在消耗堆栈空间。

现在我的想法是在 goroutine 被触发时增加一个原子计数器(在父级中,如果子级在父级完成后开始运行,以避免虚假地达到零),在 goroutine 完成时减少它,并定期检查它是否相等为零。

我基本上是在正确的轨道上,还是有更优雅的解决方案?

4

3 回答 3

11

我编写了 的第一个实现sync.WaitGroup,并且很好地支持了这个和其他边缘情况。从那时起,Dmitry 改进了实施,鉴于他的记录,我敢打赌他只会让它更安全。

特别是,您可以相信,如果当前有一个或多个被阻止的Wait呼叫,然后您在 callAdd之前使用正 delta 进行呼叫Done,您将不会取消阻止任何先前存在的Wait呼叫。

所以你绝对可以这样做,例如:

var wg sync.WaitGroup
wg.Add(1)
go func() {
    wg.Add(1)
    go func() {
        wg.Done()
    }()
    wg.Done()
}()
wg.Wait()

自从代码首次集成以来,我实际上在生产中使用了等效逻辑。

作为参考,这个内部注释是在第一个实现中放置的,并且仍然存在:

// WaitGroup creates a new semaphore each time the old semaphore
// is released. This is to avoid the following race:
//
// G1: Add(1)
// G1: go G2()
// G1: Wait() // Context switch after Unlock() and before Semacquire().
// G2: Done() // Release semaphore: sema == 1, waiters == 0. G1 doesn't run yet.
// G3: Wait() // Finds counter == 0, waiters == 0, doesn't block.
// G3: Add(1) // Makes counter == 1, waiters == 0.
// G3: go G4()
// G3: Wait() // G1 still hasn't run, G3 finds sema == 1, unblocked! Bug.

G1这描述了在接触实现时要记住的不同竞争条件,但请注意,即使在Add(1) + go f()G3.

不过,我理解您的问题,因为最近发布的文档中确实有一个令人困惑的声明,但让我们看看评论的历史,看看它实际解决了什么问题。

Russ 在修订版 15683 中将评论放在那里:

(...)
+// Note that calls with positive delta must happen before the call to Wait,
+// or else Wait may wait for too small a group. Typically this means the calls
+// to Add should execute before the statement creating the goroutine or
+// other event to be waited for. See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {

来自 Russ 的日志评论指出:

同步:添加关于呼叫位置的注意事项 (*WaitGroup)。添加

修复问题 4762。

如果我们阅读问题 4762,我们会发现:

可能值得在 sync.WaitGroup 的文档中添加一个明确的注释,即在启动包含对 Done 的调用的 go 例程之前应该完成对 Add 的调用。

所以文档实际上是对这样的代码发出警告:

var wg sync.WaitGroup
wg.Add(1)
go func() {
    go func() {
        wg.Add(1)
        wg.Done()
    }()
    wg.Done()
}()
wg.Wait()

这确实是坏掉了。应该改进评论以使其更具体,并避免您在阅读时产生的看似合理但具有误导性的理解。

于 2013-09-14T23:30:59.527 回答
0

当然,您可以sync.WaitGroup其用于您的任务,它实际上非常适合,专为此而设计。您将创建的 goroutine 的数量不是不确定的。它只是一个仅在运行时才知道的值,然后它就被精确地知道了。每个go语句都会创建一个新的 goroutine。这样go的语句之前,不管它会被执行多少次,你都必须做

wg.Add(1)

并在每个goroutine中放置

defer wg.Done()

作为第一个声明。现在你可以做

wg.Wait

等待你所有的 goroutine 完成。

于 2013-09-14T19:16:36.003 回答
0

我爱WaitGroup的简单。我不喜欢的一件事WaitGroup是必须在你的 goroutine 中传递对它的引用,因为你会将并发逻辑与业务逻辑混合在一起。此外,在您的情况下,如果您不小心,它可能会变得更加复杂和容易出错。

所以我想出了这个通用函数来为我解决这个问题:

// Parallelize parallelizes the function calls
func Parallelize(functions ...func()) {
    var waitGroup sync.WaitGroup
    waitGroup.Add(len(functions))

    defer waitGroup.Wait()

    for _, function := range functions {
        go func(copy func()) {
            defer waitGroup.Done()
            copy()
        }(function)
    }
}

所以这是我将如何使用它来解决您的问题:

func1 := func() {
        for char := 'a'; char < 'a' + 3; char++ {
            fmt.Printf("%c ", char)
        }
}

func2 := func() {
        for number := 1; number < 4; number++ {
            fmt.Printf("%d ", number)
        }
}

func3 := func() {
        Parallelize(func1, func2)
}

Parallelize(func3, func3)  // a a 1 1 b b 2 2 c c 3 3

如果你想使用它,你可以在这里找到它https://github.com/shomali11/util

于 2017-06-08T02:08:38.387 回答