5

我正在练习使用通道来实现队列。具体来说,我正在尝试使用通道的大小来限制同时运行的 goroutine 的数量。也就是说,我编写了以下代码:

package main

import "fmt"
import "time"
import "math/rand"

func runTask (t string, ch *chan bool) {
        start := time.Now()
        fmt.Println("starting task", t)
        time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time
        fmt.Println("done running task", t, "in", time.Since(start))
        <- *ch
}

func main() {
        numWorkers := 3
        files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}

        activeWorkers := make(chan bool, numWorkers)

        for _, f := range files {
                activeWorkers <- true
                fmt.Printf("activeWorkers is %d long.\n", len(activeWorkers))
                go runTask(f, &activeWorkers)
        }
        select{}
}

现在,代码崩溃了:

throw: all goroutines are asleep - deadlock!

我的期望是对 select 的调用将永远阻塞,并让 goroutine 终止而不会出现死锁。

所以我有一个双重问题:为什么不选择永远阻塞,并且在 for 循环之后没有抛出 time.Sleep() 调用,我怎样才能避免死锁?

干杯,

-mtw

4

3 回答 3

6

Arlen Cuss 已经写了一个很好的答案。我只是想为您的工作队列建议另一种设计。除了限制通道可以缓冲的条目数量之外,您还可以只生成有限数量的工作 goroutine,这感觉更自然。像这样的东西:

package main

import "fmt"
import "time"
import "math/rand"

func runTask(t string) string {
    start := time.Now()
    fmt.Println("starting task", t)
    time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time
    fmt.Println("done running task", t, "in", time.Since(start))
    return t
}

func worker(in chan string, out chan string) {
    for t := range in {
        out <- runTask(t)
    }
}

func main() {
    numWorkers := 3

    // spawn workers
    in, out := make(chan string), make(chan string)
    for i := 0; i < numWorkers; i++ {
        go worker(in, out)
    }

    files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}

    // schedule tasks
    go func() {
        for _, f := range files {
            in <- f
        }
    }()

    // get results
    for _ = range files {
        <-out
    }
}

如果您只想等到所有任务都执行完毕,您也可以使用sync.WaitGroupout ,但使用通道的好处是您可以稍后聚合结果。例如,如果每个任务都返回该文件中的字数,那么最后的循环可能用于汇总所有单个字数。

于 2012-04-16T10:58:16.360 回答
4

首先,您不需要传递指向通道的指针;通道,如地图和其他,是引用,这意味着基础数据不会被复制,只是指向实际数据的指针。如果你需要一个指向 a本身的指针,你就会知道何时到来。 chan

发生崩溃是因为程序进入了每个 goroutine 都被阻塞的状态。这应该是不可能的;如果每个 goroutine 都被阻塞了,那么就没有可能的进程来唤醒另一个 goroutine(因此您的程序将被挂起)。

主要的 goroutine 结束了select {}——不等待任何人,只是挂起。一旦最后一个runTaskgoroutine 完成,就只剩下主 goroutine 了,它不等待任何人。

您需要添加一些方法来了解每个 goroutine 何时完成;也许另一个频道可以接收完成事件。

这有点丑陋,但可能是一些灵感。

package main

import "fmt"
import "time"
import "math/rand"

func runTask(t string, ch chan bool, finishedCh chan bool) {
    start := time.Now()
    fmt.Println("starting task", t)
    time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time
    fmt.Println("done running task", t, "in", time.Since(start))
    <-ch
    finishedCh <- true
}

func main() {
    numWorkers := 3
    files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}

    activeWorkers := make(chan bool, numWorkers)
    finishedWorkers := make(chan bool)
    done := make(chan bool)

    go func() {
        remaining := len(files)
        for remaining > 0 {
            <-finishedWorkers
            remaining -= 1
        }

        done <- true
    }()

    for _, f := range files {
        activeWorkers <- true
        fmt.Printf("activeWorkers is %d long.\n", len(activeWorkers))
        go runTask(f, activeWorkers, finishedWorkers)
    }

    <-done
}
于 2012-04-16T10:25:45.077 回答
2

tux21b 已经发布了一个更惯用的解决方案,但我想以不同的方式回答您的问题。select{} 确实会永远阻塞,是的。当所有 goroutine 都被阻塞时,就会发生死锁。如果你所有其他的 goroutine 都完成了,那么你只剩下阻塞的 main goroutine,这是一个死锁。

通常,您希望在所有其他 goroutine 都完成后在您的主 goroutine 中执行某些操作,或者通过使用它们的结果,或者只是清理,为此您将执行 tux21b 建议的操作。如果你真的只是想让 main 完成并让其余的 goroutine 完成它们的工作,请将其放在defer runtime.Goexit()main 函数的顶部。这将导致它退出而不退出程序。

于 2012-04-16T21:38:25.377 回答