让我们看看你的程序在做什么。您首先初始化了三个缓冲通道 var1、var2、var3
var1 := make(chan string, 1500)
var2 := make(chan string, 1500)
var3 := make(chan string, 1500)
现在你初始化了一个 WaitGroup (wg)
var wg sync.WaitGroup
现在您定义了变量计数并且该变量是空字符串
count = ""
下一部分是一个从 0 到 15 并生成 15 个 worker1 go 例程的循环
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker1(var1, var2, var3, &wg)
}
每次启动一个 worker1 时,执行例行程序并将通道和指针传递给 worker1 中的 waitgroup (wg)。
但是worker1会做什么呢?
func worker1(var1 chan string, var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var1 {
var2 <- ch
}
}
worker1 将等待通道 var1 从该通道获取数据并将其传递给通道 var2。
这可以。你绝对不需要这个 time.Sleep(time.Second)。
接下来我们去吧。
您现在创建一个新循环,它将生成另外 15 个 go 例程 (worker2)。
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker2(var2, var3, &wg)
}
worker2 将从通道 var2 中获取所有内容并将其传递给通道 var3
func worker2(var2 chan string, var3 chan string, wg *sync.WaitGroup) {
defer wg.Done()
for ch := range var2 {
var3 <- ch
}
}
现在你为 worker3 创建了另外 15 个 go 例程。
for i := 0; i < 15; i++ {
time.Sleep(time.Second)
wg.Add(1)
go worker3(var3, &wg)
}
worker3 通过将通道 var3 附加到计数字符串来处理该数据的所有内容
最后一段代码是将数据播种到通道。该循环从 0 到 100000 并且对于每个数字将它们转换为字符串并将其传递给通道 var1
下一个程序将等待所有程序完成并打印结果。
好的,这段代码有一些问题。
- 在每个 goroutine 之前你绝对不需要这个 time.Sleep(time.Second),你也不需要在 wg.Wait() 之前的 time.Sleep。
- 这种类型的工作负载不需要缓冲通道。这是一个简单的管道,您可以使用无缓冲的通道,并且该通道将用于任务之间的同步。
当您更改代码以使用无缓冲通道并删除这些时间时。睡眠仍然有问题。问题是 go lang 运行时显示的错误:
fatal error: all goroutines are asleep - deadlock!
并终止代码。
但是为什么会发生这种情况,我们有sync.WaitGroup,一切看起来都很好。让我们看一个具有相同错误的更简单的程序。
package main
import (
"log"
"strconv"
"sync"
)
func worker(var1 <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for e := range var1 {
log.Printf("Element e %s ", e)
}
}
func main() {
var1 := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(var1, &wg)
}
for i := 0; i < 15; i++ {
var1 <- strconv.Itoa(i)
}
wg.Wait()
}
此代码也会产生与您的代码相同的错误。这是因为这些通道永远不会关闭,go 例程(工作人员)将永远等待通道中的新数据。运行时检测并终止该进程。
为了防止这种类型的错误,我们需要添加一些机制来告诉 gorutine 我们已经完成并且 goroutine 可以停止在该通道上侦听并正确完成。
发送该信号的最简单方法是关闭该 goroutine 读取的通道。这是解决问题的代码。
package main
import (
"log"
"strconv"
"sync"
)
func worker(var1 <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for e := range var1 {
log.Printf("Element e %s ", e)
}
}
func main() {
var1 := make(chan string)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go worker(var1, &wg)
}
for i := 0; i < 15; i++ {
var1 <- strconv.Itoa(i)
}
close(var1)
wg.Wait()
}
并且此代码不会产生错误。此代码将被正确终止。
但是有一个窍门。你怎么能在你的代码中做到这一点?从 var1 通道读取的 goroutine 有 15 个,从 var2 通道读取的 goroutine 有 15 个,从 var3 通道读取的 goroutine 有 15 个。
很难知道什么时候可以关闭哪个频道。我们知道通道 var1 首先处理数据,因此我们可以在生产者完成同步通道中的插入数据时关闭它们。原因是在读取之前的数据之前,我们不能将新数据插入通道。因此,当生产者插入所有数据时,我们知道通道 var1 上的所有数据都已处理,因此关闭通道是安全的。但是频道var2和var3呢?
有 15 个不同的 goroutine 等待通道 var2 和 15 个等待 var3,这意味着我们需要找到一种方法在 var2 上的所有处理完成时关闭 var2(在所有 goroutines worker1 中),对于 var3 也是如此。这可以通过创建两个额外的 goroutine 来完成
wg1 和 wg2 并使用该 goroutine 为 worker1 和 worker2 生成 goroutine,这些 goroutine 将作为协调器工作,在这些函数内部,我们为 worker1 和 worker2 创建新的 sync.Group,这些函数将知道所有这些子 goroutine 的时间完成的。因此,对于 wg1,当所有这些 worker1 goroutine 都完成后,我们可以安全地关闭 var2 通道。wg2 和 var3 通道相同。
这些是 wg1 和 wg2 函数
// wg1
wg.Add(1)
go func() {
log.Printf("Starting WG1 master go routine")
var wg1 sync.WaitGroup
defer func() {
close(var2)
wg.Done()
}()
for i := 0; i < 15; i++ {
wg1.Add(1)
go worker1(var1, var2, &wg1)
}
wg1.Wait()
}()
// wg2
wg.Add(1)
go func() {
log.Printf("Starting WG2 routine for second stage")
defer func() {
close(var3)
wg.Done()
}()
var wg2 sync.WaitGroup
for i := 0; i < 15; i++ {
wg2.Add(1)
go worker2(var2, var3, &wg2)
}
wg2.Wait()
}()
您可以在以下位置找到完整的工作代码:
工作示例