1

我有这段代码,它使用输入和输出通道以及相关的 WaitGroups 同时运行一个函数,但我被告知我做错了一些事情。这是代码:

func main() {
    concurrency := 50
    var tasksWG sync.WaitGroup
    tasks := make(chan string)
    output := make(chan string)

    for i := 0; i < concurrency; i++ {
        tasksWG.Add(1)

        // evidentally because I'm processing tasks in a groutine then I'm not blocking and I end up closing the tasks channel almost immediately and stopping tasks from executing
        go func() {
            for t := range tasks {
                output <- process(t)
                continue
            }
            tasksWG.Done()
        }()
    }

    var outputWG sync.WaitGroup
    outputWG.Add(1)
    go func() {
        for o := range output {
            fmt.Println(o)
        }
        outputWG.Done()
    }()

    go func() {
        // because of what was mentioned in the previous comment, the tasks wait group finishes almost immediately which then closes the output channel almost immediately as well which ends ranging over output early
        tasksWG.Wait()
        close(output)
    }()

    f, err := os.Open(os.Args[1])
    if err != nil {
        log.Panic(err)
    }

    s := bufio.NewScanner(f)
    for s.Scan() {
        tasks <- s.Text()
    }

    close(tasks)
    // and finally the output wait group finishes almost immediately as well because tasks gets closed right away due to my improper use of goroutines
    outputWG.Wait()
}

func process(t string) string {
    time.Sleep(3 * time.Second)
    return t
}

我已经在评论中指出我执行错误的地方。现在这些评论对我来说很有意义。有趣的是,这段代码确实看起来是异步运行的,并且大大加快了执行速度。我想了解我做错了什么,但是当代码似乎以异步方式执行时,我很难理解它。我很想更好地理解这一点。

4

1 回答 1

1

您的主要 goroutine 正在按顺序执行几件事情,而其他事情正在同时执行,所以我认为您的执行顺序已关闭

    f, err := os.Open(os.Args[1])
    if err != nil {
        log.Panic(err)
    }

    s := bufio.NewScanner(f)
    for s.Scan() {
        tasks <- s.Text()
    }

你不应该把它移到顶部吗?因此,您将值发送到任务

然后让你的循环在名为 for 循环的并发中跨越任务 50 次(你希望在调用覆盖它的代码之前在任务中有一些东西)


go func() {
    // because of what was mentioned in the previous comment, the tasks wait group finishes almost immediately which then closes the output channel almost immediately as well which ends ranging over output early
    tasksWG.Wait()
    close(output)
}()

这里的逻辑让我感到困惑,你正在生成一个 goroutine 来等待等待组,所以这里的等待在主 goroutine 上是非阻塞的——这就是你想要做的吗?它不会等待在 main 中将 tasksWG 减为零,它会在您创建的 goroutine 中执行此操作。我不相信你想那样做?


如果您可以提供有关预期输出的更多详细信息,可能会更容易调试?

于 2021-12-29T18:54:07.283 回答