所以我看到了很多在 Go 中实现一个消费者和多个生产者的方法——Go 中的Concurrency 中的经典 fanIn 函数。
我想要的是一个扇出功能。它将一个通道作为参数,它从中读取一个值,并返回一个通道切片,并将该值的副本写入该通道。
是否有正确/推荐的实施方式?
所以我看到了很多在 Go 中实现一个消费者和多个生产者的方法——Go 中的Concurrency 中的经典 fanIn 函数。
我想要的是一个扇出功能。它将一个通道作为参数,它从中读取一个值,并返回一个通道切片,并将该值的副本写入该通道。
是否有正确/推荐的实施方式?
您几乎描述了最好的方法,但这里有一个小的代码示例。
去游乐场: https: //play.golang.org/p/jwdtDXVHJk
package main
import (
"fmt"
"time"
)
func producer(iters int) <-chan int {
c := make(chan int)
go func() {
for i := 0; i < iters; i++ {
c <- i
time.Sleep(1 * time.Second)
}
close(c)
}()
return c
}
func consumer(cin <-chan int) {
for i := range cin {
fmt.Println(i)
}
}
func fanOut(ch <-chan int, size, lag int) []chan int {
cs := make([]chan int, size)
for i, _ := range cs {
// The size of the channels buffer controls how far behind the recievers
// of the fanOut channels can lag the other channels.
cs[i] = make(chan int, lag)
}
go func() {
for i := range ch {
for _, c := range cs {
c <- i
}
}
for _, c := range cs {
// close all our fanOut channels when the input channel is exhausted.
close(c)
}
}()
return cs
}
func fanOutUnbuffered(ch <-chan int, size int) []chan int {
cs := make([]chan int, size)
for i, _ := range cs {
// The size of the channels buffer controls how far behind the recievers
// of the fanOut channels can lag the other channels.
cs[i] = make(chan int)
}
go func() {
for i := range ch {
for _, c := range cs {
c <- i
}
}
for _, c := range cs {
// close all our fanOut channels when the input channel is exhausted.
close(c)
}
}()
return cs
}
func main() {
c := producer(10)
chans := fanOutUnbuffered(c, 3)
go consumer(chans[0])
go consumer(chans[1])
consumer(chans[2])
}
需要注意的重要部分是一旦输入通道耗尽,我们如何关闭输出通道。此外,如果其中一个输出通道阻塞发送,它将阻止其他输出通道的发送。我们通过设置通道的缓冲区大小来控制延迟量。
下面的这个解决方案有点做作,但它对我有用:
package main
import (
"fmt"
"time"
"crypto/rand"
"encoding/binary"
)
func handleNewChannels(arrchangen chan [](chan uint32),
intchangen chan (chan uint32)) {
currarr := []chan uint32{}
arrchangen <- currarr
for {
newchan := <-intchangen
currarr = append(currarr, newchan)
arrchangen <- currarr
}
}
func sendToChannels(arrchangen chan [](chan uint32)) {
tick := time.Tick(1 * time.Second)
currarr := <-arrchangen
for {
select {
case <-tick:
sent := false
var n uint32
binary.Read(rand.Reader, binary.LittleEndian, &n)
for i := 0 ; i < len(currarr) ; i++ {
currarr[i] <- n
sent = true
}
if sent {
fmt.Println("Sent generated ", n)
}
case newarr := <-arrchangen:
currarr = newarr
}
}
}
func handleChannel(tchan chan uint32) {
for {
val := <-tchan
fmt.Println("Got the value ", val)
}
}
func createChannels(intchangen chan (chan uint32)) {
othertick := time.Tick(5 * time.Second)
for {
<-othertick
fmt.Println("Creating new channel! ")
newchan := make(chan uint32)
intchangen <- newchan
go handleChannel(newchan)
}
}
func main() {
arrchangen := make(chan [](chan uint32))
intchangen := make(chan (chan uint32))
go handleNewChannels(arrchangen, intchangen)
go sendToChannels(arrchangen)
createChannels(intchangen)
}
首先,请参阅相关问题Go 中生产者/消费者最简洁的成语是什么?一个线程对另一个线程(消费者/生产者)表现出兴趣。另外,看看生产者-消费者问题。关于并发查看如何在 Google Go 中实现并发。
我们可以处理多个消费者,而无需为每个消费者复制通道数据。
去游乐场: https: //play.golang.org/p/yOKindnqiZv
package main
import (
"fmt"
"sync"
)
type data struct {
msg string
consumers int
}
func main() {
ch := make(chan *data) // both block or non-block are ok
var wg sync.WaitGroup
consumerCount := 3 // specify no. of consumers
producer := func() {
obj := &data {
msg: "hello everyone!",
consumers: consumerCount,
}
ch <- obj
}
consumer := func(idx int) {
defer wg.Done()
obj := <-ch
fmt.Printf("consumer %d received data %v\n", idx, obj)
obj.consumers--
if obj.consumers > 0 {
ch <- obj // forward to others
} else {
fmt.Printf("last receiver: %d\n", idx)
}
}
go producer()
for i:=1; i<=consumerCount; i++ {
wg.Add(1)
go consumer(i)
}
wg.Wait()
}