18

所以我看到了很多在 Go 中实现一个消费者和多个生产者的方法——Go 中的Concurrency 中的经典 fanIn 函数。

我想要的是一个扇出功能。它将一个通道作为参数,它从中读取一个值,并返回一个通道切片,并将该值的副本写入该通道。

是否有正确/推荐的实施方式?

4

4 回答 4

20

您几乎描述了最好的方法,但这里有一个小的代码示例。

去游乐场: https: //play.golang.org/p/jwdtDXVHJk

package main

import (
    "fmt"
    "time"
)

func producer(iters int) <-chan int {
    c := make(chan int)
    go func() {
        for i := 0; i < iters; i++ {
            c <- i
            time.Sleep(1 * time.Second)
        }
        close(c)
    }()
    return c
}

func consumer(cin <-chan int) {
    for i := range cin {
        fmt.Println(i)
    }
}

func fanOut(ch <-chan int, size, lag int) []chan int {
    cs := make([]chan int, size)
    for i, _ := range cs {
        // The size of the channels buffer controls how far behind the recievers
        // of the fanOut channels can lag the other channels.
        cs[i] = make(chan int, lag)
    }
    go func() {
        for i := range ch {
            for _, c := range cs {
                c <- i
            }
        }
        for _, c := range cs {
            // close all our fanOut channels when the input channel is exhausted.
            close(c)
        }
    }()
    return cs
}

func fanOutUnbuffered(ch <-chan int, size int) []chan int {
    cs := make([]chan int, size)
    for i, _ := range cs {
        // The size of the channels buffer controls how far behind the recievers
        // of the fanOut channels can lag the other channels.
        cs[i] = make(chan int)
    }
    go func() {
        for i := range ch {
            for _, c := range cs {
                c <- i
            }
        }
        for _, c := range cs {
            // close all our fanOut channels when the input channel is exhausted.
            close(c)
        }
    }()
    return cs
}

func main() {
    c := producer(10)
    chans := fanOutUnbuffered(c, 3)
    go consumer(chans[0])
    go consumer(chans[1])
    consumer(chans[2])
}

需要注意的重要部分是一旦输入通道耗尽,我们如何关闭输出通道。此外,如果其中一个输出通道阻塞发送,它将阻止其他输出通道的发送。我们通过设置通道的缓冲区大小来控制延迟量。

于 2013-06-05T04:02:58.200 回答
2

下面的这个解决方案有点做作,但它对我有用:

package main

import (
    "fmt"
    "time"
    "crypto/rand"
    "encoding/binary"
)

func handleNewChannels(arrchangen chan [](chan uint32),
                       intchangen chan (chan uint32)) {
    currarr := []chan uint32{}
    arrchangen <- currarr
    for {
        newchan := <-intchangen
        currarr = append(currarr, newchan)
        arrchangen <- currarr
    }
}

func sendToChannels(arrchangen chan [](chan uint32)) {
    tick := time.Tick(1 * time.Second)
    currarr := <-arrchangen
    for {
        select {
        case <-tick:
            sent := false
            var n uint32
            binary.Read(rand.Reader, binary.LittleEndian, &n)
            for i := 0 ; i < len(currarr) ; i++ {
                currarr[i] <- n
                sent = true
            }
            if sent {
                fmt.Println("Sent generated ", n)
            }
        case newarr := <-arrchangen:
            currarr = newarr
        }
    }
}
func handleChannel(tchan chan uint32) {
    for {
        val := <-tchan
        fmt.Println("Got the value ", val)
    }
}

func createChannels(intchangen chan (chan uint32)) {
    othertick := time.Tick(5 * time.Second)
    for {
        <-othertick
        fmt.Println("Creating new channel! ")
        newchan := make(chan uint32)
        intchangen <- newchan
        go handleChannel(newchan)
    }
}

func main() {
    arrchangen := make(chan [](chan uint32))
    intchangen := make(chan (chan uint32))
    go handleNewChannels(arrchangen, intchangen)
    go sendToChannels(arrchangen)
    createChannels(intchangen)
}
于 2014-04-04T14:54:24.337 回答
0

首先,请参阅相关问题Go 中生产者/消费者最简洁的成语是什么?一个线程对另一个线程(消费者/生产者)表现出兴趣。另外,看看生产者-消费者问题。关于并发查看如何在 Google Go 中实现并发

于 2013-06-05T02:08:28.900 回答
0

我们可以处理多个消费者,而无需为每个消费者复制通道数据。

去游乐场: https: //play.golang.org/p/yOKindnqiZv

package main

import (
    "fmt"
    "sync"
)

type data struct {
    msg string
    consumers int
}

func main() {
    ch := make(chan *data) // both block or non-block are ok
    var wg sync.WaitGroup
    consumerCount := 3 // specify no. of consumers

    producer := func() {
        obj := &data {
            msg: "hello everyone!",
            consumers: consumerCount,
        }
        ch <- obj
    }
    consumer := func(idx int) {
        defer wg.Done()
        obj := <-ch
        fmt.Printf("consumer %d received data %v\n", idx, obj)
        obj.consumers--
        if obj.consumers > 0 {
            ch <- obj // forward to others
        } else {
            fmt.Printf("last receiver: %d\n", idx)
        }
    }

    go producer()
    for i:=1; i<=consumerCount; i++ {
        wg.Add(1)
        go consumer(i)
    }

    wg.Wait()
}
于 2020-06-16T13:33:42.583 回答