0
  1. 我们能否有多个 goroutine 监听多个通道,在打印所有问题时面临发布。

  2. 我无法打印所有数字如何改进这段代码

  3. 如果可能的话,任何人都可以提供一些例子,因为我正在努力解决这个例子。

  4. 每次去例行程序后是否需要 time.sleep


    package main
    
    import (
        "fmt"
        "strconv"
        "sync"
        "time"
    )
    
    var count string
    
    func worker3(var3 chan string, wg *sync.WaitGroup) {
        defer wg.Done()
        for ch := range var3 {
            count += ch + " "
        }
    }
    
    func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
        defer wg.Done()
        for ch := range var2 {
            var3 <- ch
        }
    }
    
    func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
        defer wg.Done()
        for ch := range var1 {
            var2 <- ch
        }
    }
    
    func main() {
        var1 := make(chan string, 1500)
        var2 := make(chan string, 1500)
        var3 := make(chan string, 1500)
    
        var wg sync.WaitGroup
        count = ""
        for i := 0; i < 15; i++ {
            time.Sleep(time.Second)
            wg.Add(1)
            go worker1(var1, var2, var3, &wg)
        }
        for i := 0; i < 15; i++ {
            time.Sleep(time.Second)
            wg.Add(1)
            go worker2(var2, var3, &wg)
        }
        for i := 0; i < 15; i++ {
            time.Sleep(time.Second)
            wg.Add(1)
            go worker3(var3, &wg)
        }
    
        for i := 0; i <= 100000; i++ {
            var1 <- strconv.Itoa(i)
        }
        time.Sleep(time.Second)
        wg.Wait()
        fmt.Println(count)
    }

4

2 回答 2

2

让我们看看你的程序在做什么。您首先初始化了三个缓冲通道 var1、var2、var3

var1 := make(chan string, 1500)
var2 := make(chan string, 1500)
var3 := make(chan string, 1500)

现在你初始化了一个 WaitGroup (wg)

var wg sync.WaitGroup

现在您定义了变量计数并且该变量是空字符串

count = "" 

下一部分是一个从 0 到 15 并生成 15 个 worker1 go 例程的循环

for i := 0; i < 15; i++ {
     time.Sleep(time.Second)
     wg.Add(1)
     go worker1(var1, var2, var3, &wg)
}

每次启动一个 worker1 时,执行例行程序并将通道和指针传递给 worker1 中的 waitgroup (wg)。

但是worker1会做什么呢?

func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
        defer wg.Done()
        for ch := range var1 {
            var2 <- ch
        }
    }

worker1 将等待通道 var1 从该通道获取数据并将其传递给通道 var2。

这可以。你绝对不需要这个 time.Sleep(time.Second)。

接下来我们去吧。

您现在创建一个新循环,它将生成另外 15 个 go 例程 (worker2)。

for i := 0; i < 15; i++ {
    time.Sleep(time.Second)
    wg.Add(1)
    go worker2(var2, var3, &wg)
}

worker2 将从通道 var2 中获取所有内容并将其传递给通道 var3

func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
        defer wg.Done()
        for ch := range var2 {
            var3 <- ch
        }
    }

现在你为 worker3 创建了另外 15 个 go 例程。

for i := 0; i < 15; i++ {
            time.Sleep(time.Second)
            wg.Add(1)
            go worker3(var3, &wg)
        }

worker3 通过将通道 var3 附加到计数字符串来处理该数据的所有内容

最后一段代码是将数据播种到通道。该循环从 0 到 100000 并且对于每个数字将它们转换为字符串并将其传递给通道 var1

下一个程序将等待所有程序完成并打印结果。

好的,这段代码有一些问题。

  1. 在每个 goroutine 之前你绝对不需要这个 time.Sleep(time.Second),你也不需要在 wg.Wait() 之前的 time.Sleep。
  2. 这种类型的工作负载不需要缓冲通道。这是一个简单的管道,您可以使用无缓冲的通道,并且该通道将用于任务之间的同步。

当您更改代码以使用无缓冲通道并删除这些时间时。睡眠仍然有问题。问题是 go lang 运行时显示的错误:

fatal error: all goroutines are asleep - deadlock!

并终止代码。

但是为什么会发生这种情况,我们有sync.WaitGroup,一切看起来都很好。让我们看一个具有相同错误的更简单的程序。

package main

import (
    "log"
    "strconv"
    "sync"
)

func worker(var1 <-chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    for e := range var1 {
        log.Printf("Element e %s ", e)
    }

}
func main() {
    var1 := make(chan string)
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(var1, &wg)
    }

    for i := 0; i < 15; i++ {
        var1 <- strconv.Itoa(i)
    }

    wg.Wait()
}

此代码也会产生与您的代码相同的错误。这是因为这些通道永远不会关闭,go 例程(工作人员)将永远等待通道中的新数据。运行时检测并终止该进程。

为了防止这种类型的错误,我们需要添加一些机制来告诉 gorutine 我们已经完成并且 goroutine 可以停止在该通道上侦听并正确完成。

发送该信号的最简单方法是关闭该 goroutine 读取的通道。这是解决问题的代码。

package main

import (
    "log"
    "strconv"
    "sync"
)

func worker(var1 <-chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    for e := range var1 {
        log.Printf("Element e %s ", e)
    }

}
func main() {
    var1 := make(chan string)
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(var1, &wg)
    }

    for i := 0; i < 15; i++ {
        var1 <- strconv.Itoa(i)
    }
    close(var1)
    wg.Wait()
}

并且此代码不会产生错误。此代码将被正确终止。

但是有一个窍门。你怎么能在你的代码中做到这一点?从 var1 通道读取的 goroutine 有 15 个,从 var2 通道读取的 goroutine 有 15 个,从 var3 通道读取的 goroutine 有 15 个。

很难知道什么时候可以关闭哪个频道。我们知道通道 var1 首先处理数据,因此我们可以在生产者完成同步通道中的插入数据时关闭它们。原因是在读取之前的数据之前,我们不能将新数据插入通道。因此,当生产者插入所有数据时,我们知道通道 var1 上的所有数据都已处理,因此关闭通道是安全的。但是频道var2var3呢?

有 15 个不同的 goroutine 等待通道 var2 和 15 个等待 var3,这意味着我们需要找到一种方法在 var2 上的所有处理完成时关闭 var2(在所有 goroutines worker1 中),对于 var3 也是如此。这可以通过创建两个额外的 goroutine 来完成

wg1 和 wg2 并使用该 goroutine 为 worker1 和 worker2 生成 goroutine,这些 goroutine 将作为协调器工作,在这些函数内部,我们为 worker1 和 worker2 创建新的 sync.Group,这些函数将知道所有这些子 goroutine 的时间完成的。因此,对于 wg1,当所有这些 worker1 goroutine 都完成后,我们可以安全地关闭 var2 通道。wg2 和 var3 通道相同。

这些是 wg1 和 wg2 函数

// wg1
wg.Add(1)
go func() {
        log.Printf("Starting WG1 master go routine")
        var wg1 sync.WaitGroup
        defer func() {
            close(var2)
            wg.Done()
        }()
        for i := 0; i < 15; i++ {
            wg1.Add(1)
            go worker1(var1, var2, &wg1)
        }
        wg1.Wait()
    }()
// wg2
wg.Add(1)
go func() {
        log.Printf("Starting WG2 routine for second stage")
        defer func() {
            close(var3)
            wg.Done()
        }()
        var wg2 sync.WaitGroup
        for i := 0; i < 15; i++ {
            wg2.Add(1)
            go worker2(var2, var3, &wg2)
        }
        wg2.Wait()
    }()

您可以在以下位置找到完整的工作代码: 工作示例

于 2022-01-01T00:33:46.173 回答
1

是的,这很复杂,但是有一些经验法则可以让事情变得更加简单。

  • 更喜欢对传递给 go-routines 的通道使用形式参数,而不是访问全局范围内的通道。您可以通过这种方式获得更多的编译器检查,以及更好的模块化。
  • 避免在特定的 go-routine(包括“主”)中在同一通道上读取和写入。否则,死锁的风险要大得多。
于 2021-12-31T12:09:24.827 回答