0

我正在玩一些代码以用于学习目的,并且在使用-race标志时我得到了执行的竞争条件,我想了解原因。代码启动了一组固定的 goroutines,这些 goroutines 充当工作人员从通道消费任务,没有固定数量的任务,只要通道接收工作人员必须继续工作的任务。

调用WaitGroup函数时出现竞争条件。据我了解(查看数据竞争报告),当第一次wg.Add调用由一个衍生的 goroutine 和主例程调用wg.Wait同时执行时,就会发生竞争条件。那是对的吗?如果是,这意味着我必须始终在主例程上执行对 Add 的调用以避免这种资源竞争?但是,这也意味着我需要知道工作人员需要提前处理多少任务,如果我需要代码处理工作人员运行后可能出现的任何数量的任务,这会很糟糕......

编码:

func Test(t *testing.T) {
    t.Run("", func(t *testing.T) {
        var wg sync.WaitGroup
        queuedTaskC := make(chan func())
        for i := 0; i < 5; i++ {
            wID := i + 1
            go func(workerID int) {
                for task := range queuedTaskC {
                    wg.Add(1)
                    task()
                }
            }(wID)
        }

        taskFn := func() {
            fmt.Println("executing task...")
            wg.Done()
        }
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn
        queuedTaskC <- taskFn

        wg.Wait()
        close(queuedTaskC)

        fmt.Println(len(queuedTaskC))
    })
}

那个报告:

==================
WARNING: DATA RACE
Read at 0x00c0001280d8 by goroutine 11:
  internal/race.Read()
      /src/internal/race/race.go:37 +0x206
  sync.(*WaitGroup).Add()
      /src/sync/waitgroup.go:71 +0x219
  workerpool.Test.func1.1()
      /workerpool/workerpool_test.go:36 +0x64

Previous write at 0x00c0001280d8 by goroutine 8:
  internal/race.Write()
      /src/internal/race/race.go:41 +0x125
  sync.(*WaitGroup).Wait()
      /src/sync/waitgroup.go:128 +0x126
  workerpool.Test.func1()
      /workerpool/workerpool_test.go:57 +0x292
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202

Goroutine 11 (running) created at:
  workerpool.Test.func1()
      /workerpool/workerpool_test.go:34 +0xe4
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202

Goroutine 8 (running) created at:
  testing.(*T).Run()
      /src/testing/testing.go:1168 +0x5bb
  workerpool.Test()
      workerpool_test.go:29 +0x4c
  testing.tRunner()
      /src/testing/testing.go:1123 +0x202
==================
4

1 回答 1

3

WaitGroup实现是基于由AddDone方法改变的内部计数器。在计数器归零之前,该Wait方法不会返回。也可以重用WaitGroup,但在文档中描述的某些条件下:

// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.

尽管您的代码没有重复使用wg,但它可以WaitGroup多次将计数器归零。当在给定时间没有处理任何任务时会发生这种情况,这在并发代码中是完全可能的。而且由于您的代码在调用之前不会等待Wait返回,Add因此您会收到竞争条件错误。

正如每个人在评论中建议的那样,您应该放弃跟踪任务的想法,WaitGroup转而控制正在运行的 goroutine。附上代码提案。

func Test(t *testing.T) {
    var wg sync.WaitGroup
    queuedTaskC := make(chan func(), 10)
    for i := 0; i < 5; i++ {
        wID := i + 1
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for task := range queuedTaskC {
                task()
            }
        }(wID)
    }
    for i := 0; i < 10; i++ {
        queuedTaskC <- func() {
            fmt.Println("executing task...")
        }
    }
    close(queuedTaskC)
    wg.Wait()
    fmt.Println(len(queuedTaskC))
}
于 2021-09-07T17:39:52.630 回答