0

在下面的代码中,我不明白为什么“Worker”方法似乎退出而不是从输入通道“in”中提取值并处理它们。

我曾假设他们只会在消耗来自输入通道“in”的所有输入并处理它们之后才会返回

package main

import (
    "fmt"
    "sync"
)

type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)

type Result struct {
    i   int
    val int
}

func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
    for item := range in {
        item *= item // returns the square of the input value
        fmt.Printf("=> %d: %d\n", id, item)
        out <- Result{item, id}
    }
    wg.Done()
    fmt.Printf("%d exiting ", id)
}

func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
    wg := sync.WaitGroup{}
    for id := 0; id < n_workers; id++ {
        fmt.Printf("Starting : %d\n", id)
        wg.Add(1)
        go Worker(in, out, id, &wg)
    }
    wg.Wait()  // wait for all workers to complete their tasks
    close(out) // close the output channel when all tasks are completed
}

const (
    NW = 4
)

func main() {
    in := make(chan int)
    out := make(chan Result)

    go func() {
        for i := 0; i < 100; i++ {
            in <- i
        }
        close(in)
    }()
    Run_parallel(NW, in, out, Worker)

    for item := range out {
        fmt.Printf("From out : %d: %d", item.i, item.val)
    }
}


输出是

Starting : 0
Starting : 1
Starting : 2
Starting : 3
=> 3: 0
=> 0: 1
=> 1: 4
=> 2: 9
fatal error: all goroutines are asleep - deadlock!
4

3 回答 3

3

致命错误:所有 goroutine 都处于休眠状态 - 死锁!

完整的错误显示了每个 goroutine “卡住”的位置。 如果你在操场上运行它,它甚至会显示你的行号。这让我很容易诊断。

Run_parallelmaingroutine 中运行,因此在main可以读取之前outRun_parallel必须返回。在Run_parallel可以返回之前,它必须wg.Wait()。但在工人打电话之前wg.Done(),他们必须写信给out。这就是导致僵局的原因。

一种解决方案很简单:只需Run_parallel在自己的 Goroutine 中并发运行。

    go Run_parallel(NW, in, out, Worker)

现在,mainrange over out,等待outs 关闭以发出完成信号。 Run_parallel等待工人与wg.Wait(),工人将范围内in。所有的工作都会完成,并且在完成之前程序不会结束。(https://go.dev/play/p/oMrgH2U09tQ

于 2021-12-02T15:16:57.953 回答
-1

解决方案的替代配方:

在那个替代公式中,没有必要将 Run_parallel 作为 goroutine 启动(它会触发自己的 goroutine)。我更喜欢第二种解决方案,因为它自动执行 Run_parallel() 必须与主函数并行运行的事实。此外,出于同样的原因,它更安全,更不容易出错(无需记住使用 go 关键字运行 Run_parallel)。

package main

import (
    "fmt"
    "sync"
)

type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)

type Result struct {
    id  int
    val int
}

func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
    defer wg.Done()
    for item := range in {
        item *= 2 // returns the double of the input value (Bogus handling of data)
        out <- Result{id, item}
    }
}

func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
    go func() {
        wg := sync.WaitGroup{}
        defer close(out) // close the output channel when all tasks are completed
        for id := 0; id < n_workers; id++ {
            wg.Add(1)
            go Worker(in, out, id, &wg)
        }
        wg.Wait() // wait for all workers to complete their tasks *and* trigger the -differed- close(out)
    }()
}

const (
    NW = 8
)

func main() {

    in := make(chan int)
    out := make(chan Result)

    go func() {
        defer close(in)
        for i := 0; i < 10; i++ {
            in <- i
        }
    }()

    Run_parallel(NW, in, out, Worker)

    for item := range out {
        fmt.Printf("From out [%d]: %d\n", item.id, item.val)
    }

    println("- - - All done - - -")
}

于 2021-12-02T18:22:20.547 回答
-1

解决方案 :

Run_parallel 必须在它自己的 goroutine 中运行:

package main

import (
    "fmt"
    "sync"
)

type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)

type Result struct {
    id  int
    val int
}

func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
    defer wg.Done()
    for item := range in {
        item *= 2 // returns the double of the input value (Bogus handling of data)
        out <- Result{id, item}
    }
}

func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
    wg := sync.WaitGroup{}
    for id := 0; id < n_workers; id++ {
        wg.Add(1)
        go Worker(in, out, id, &wg)
    }
    wg.Wait()  // wait for all workers to complete their tasks
    close(out) // close the output channel when all tasks are completed
}

const (
    NW = 8
)

func main() {

    in := make(chan int)
    out := make(chan Result)

    go func() {
        for i := 0; i < 10; i++ {
            in <- i
        }
        close(in)
    }()

    go Run_parallel(NW, in, out, Worker)

    for item := range out {
        fmt.Printf("From out [%d]: %d\n", item.id, item.val)
    }

    println("- - - All done - - -")

}

于 2021-12-02T15:31:12.867 回答