2

我试图更多地了解 go 的通道和 goroutines,所以我决定制作一个小程序来计算文件中的单词,由bufio.NewScanner对象读取:

nCPUs := flag.Int("cpu", 2, "number of CPUs to use")
flag.Parse()
runtime.GOMAXPROCS(*nCPUs)    

scanner := bufio.NewScanner(file)
lines := make(chan string)
results := make(chan int)

for i := 0; i < *nCPUs; i++ {
    go func() {
        for line := range lines {
            fmt.Printf("%s\n", line)
            results <- len(strings.Split(line, " "))
        }
    }()
}

for scanner.Scan(){
    lines <- scanner.Text()
}
close(lines)


acc := 0
for i := range results {
      acc += i
 }

fmt.Printf("%d\n", acc)

现在,在我迄今为止发现的大多数示例中,通道linesresults通道都会被缓冲,例如make(chan int, NUMBER_OF_LINES_IN_FILE). 尽管如此,在运行此代码后,我的程序仍然存在并显示fatal error: all goroutines are asleep - deadlock!错误消息。

基本上我的想法是我需要两个通道:一个与 goroutine 通信文件中的行(因为它可以是任何大小,我不喜欢认为我需要在make(chan)函数调用中通知大小。其他通道会从 goroutine 收集结果,在主函数中我会用它来计算一个累积的结果。

使用 goroutine 和通道以这种方式编程的最佳选择应该是什么?任何帮助深表感谢。

4

2 回答 2

7

正如@AndrewN 所指出的,问题是每个 goroutine 都到达了它试图发送到results通道的点,但是这些发送会阻塞,因为通道是无缓冲的,并且在循环results之前没有任何东西从它们读取。for i := range results您永远不会进入该循环,因为您首先需要完成for scanner.Scan()循环,该循环试图将所有lines 发送到lines通道,该通道被阻塞,因为 goroutine 永远不会循环回到 ,range lines因为它们被卡住发送到results.

要解决此问题,您可能会尝试做的第一件事是将这些scanner.Scan()内容放入 goroutine 中,以便可以立即开始从results通道中读取内容。但是,您将遇到的下一个问题是知道何时结束for i := range results循环。你想要关闭results通道,但只有在原始 goroutine 完成读取lines通道之后。您可以results在关闭频道后立即关闭lines频道,但是我认为这可能会引入潜在的竞争,所以最安全的做法是在关闭频道之前等待原始的两个 goroutine 完成results:(游乐场链接):

package main

import "fmt"
import "runtime"
import "bufio"
import "strings"
import "sync"

func main() {
    runtime.GOMAXPROCS(2)

    scanner := bufio.NewScanner(strings.NewReader(`
hi mom
hi dad
hi sister
goodbye`))
    lines := make(chan string)
    results := make(chan int)

    wg := sync.WaitGroup{}
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func() {
            for line := range lines {
                fmt.Printf("%s\n", line)
                results <- len(strings.Split(line, " "))
            }
            wg.Done()
        }()
    }

    go func() {
        for scanner.Scan() {
            lines <- scanner.Text()
        }
        close(lines)
        wg.Wait()
        close(results)
    }()

    acc := 0
    for i := range results {
        acc += i
    }

    fmt.Printf("%d\n", acc)
}
于 2015-09-29T05:10:57.873 回答
5

go 中的通道默认情况下是无缓冲的,这意味着在您开始尝试从该通道接收之前,您生成的任何匿名 goroutine 都不能发送到结果通道。这不会在主程序中开始执行,直到scanner.Scan()完成填充线路通道......在您的匿名函数可以发送到结果通道并重新启动它们的循环之前,它被阻止执行。僵局。

您的代码中的另一个问题,即使通过缓冲通道来简单地解决上述问题,因为通道尚未关闭,一旦没有更多结果输入, for i := range 结果也会死锁。

编辑:这是一个潜在的解决方案,如果你想避免缓冲通道。基本上,第一个问题是通过一个新的 goroutine 执行到结果通道的发送来避免的,从而允许行循环完成。第二个问题(不知道何时停止读取通道)可以通过在创建 goroutine 时对其进行计数并在每个 goroutine 都被计算在内时显式关闭通道来避免。对等待组做类似的事情可能会更好,但这只是展示如何无缓冲地执行此操作的一种非常快速的方法。

于 2015-09-29T03:15:31.243 回答