100

我有多个 goroutines 试图同时在同一个频道上接收。似乎最后一个开始在通道上接收的 goroutine 获得了值。这是语言规范中的某个地方还是未定义的行为?

c := make(chan string)
for i := 0; i < 5; i++ {
    go func(i int) {
        <-c
        c <- fmt.Sprintf("goroutine %d", i)
    }(i)
}
c <- "hi"
fmt.Println(<-c)

输出:

goroutine 4

操场上的例子

编辑:

我才意识到这比我想象的要复杂。消息在所有 goroutine 中传递。

c := make(chan string)
for i := 0; i < 5; i++ {
    go func(i int) {
        msg := <-c
        c <- fmt.Sprintf("%s, hi from %d", msg, i)
    }(i)
}
c <- "original"
fmt.Println(<-c)

输出:

original, hi from 0, hi from 1, hi from 2, hi from 3, hi from 4

注意: 上述输出在最新版本的 Go 中已经过时(见评论)

操场上的例子

4

6 回答 6

85

是的,这很复杂,但是有一些经验法则可以让事情变得更加简单。

  • 更喜欢对传递给 go-routines 的通道使用形式参数,而不是访问全局范围内的通道。您可以通过这种方式获得更多的编译器检查,以及更好的模块化。
  • 避免在特定的 go-routine(包括“主”)中在同一通道上读取和写入。否则,死锁的风险要大得多。

这是您的程序的替代版本,应用了这两个准则。这个案例展示了一个频道上有很多作者和一个读者:

c := make(chan string)

for i := 1; i <= 5; i++ {
    go func(i int, co chan<- string) {
        for j := 1; j <= 5; j++ {
            co <- fmt.Sprintf("hi from %d.%d", i, j)
        }
    }(i, c)
}

for i := 1; i <= 25; i++ {
    fmt.Println(<-c)
}

http://play.golang.org/p/quQn7xePLw

它创建了五个 go-routines 写入单个通道,每个通道写入五次。主要的 go-routine 读取所有 25 条消息 - 你可能会注意到它们出现的顺序通常不是连续的(即并发性很明显)。

这个例子展示了 Go 通道的一个特性:可以有多个 writer 共享一个通道;Go 会自动交错消息。

这同样适用于一个通道上的一个写入器和多个读取器,如这里的第二个示例所示:

c := make(chan int)
var w sync.WaitGroup
w.Add(5)

for i := 1; i <= 5; i++ {
    go func(i int, ci <-chan int) {
        j := 1
        for v := range ci {
            time.Sleep(time.Millisecond)
            fmt.Printf("%d.%d got %d\n", i, j, v)
            j += 1
        }
        w.Done()
    }(i, c)
}

for i := 1; i <= 25; i++ {
    c <- i
}
close(c)
w.Wait()

第二个示例包括对主 goroutine 施加的等待,否则它将立即退出并导致其他五个 goroutine 提前终止(感谢olov的更正)

在这两个示例中,都不需要缓冲。将缓冲仅视为性能增强器通常是一个很好的原则。如果您的程序在没有缓冲区的情况下不会死锁,那么它也不会因缓冲区死锁(但反之亦然)。因此,作为另一个经验法则,开始时不要缓冲,然后根据需要添加

于 2013-03-30T17:30:15.747 回答
29

回复晚了,但我希望这对以后的其他人有所帮助,例如长轮询,“全球”按钮,向所有人广播?

Effective Go解释了这个问题:

接收器总是阻塞,直到有数据要接收。

这意味着您不能让超过 1 个 goroutine 监听 1 个通道并期望所有 goroutine 接收相同的值。

运行此代码示例

package main

import "fmt"

func main() {
    c := make(chan int)

    for i := 1; i <= 5; i++ {
        go func(i int) {
        for v := range c {
                fmt.Printf("count %d from goroutine #%d\n", v, i)
            }
        }(i)
    }

    for i := 1; i <= 25; i++ {
        c<-i
    }

    close(c)
}

即使有 5 个 goroutine 在监听通道,您也不会多次看到“count 1”。这是因为当第一个 goroutine 阻塞通道时,所有其他 goroutine 必须排队等待。当通道被解除阻塞时,计数已经被接收并从通道中删除,所以下一个 goroutine 获得下一个计数值。

于 2013-11-06T17:16:07.573 回答
7

这很复杂。

另外,看看会发生什么GOMAXPROCS = NumCPU+1。例如,

package main

import (
    "fmt"
    "runtime"
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU() + 1)
    fmt.Print(runtime.GOMAXPROCS(0))
    c := make(chan string)
    for i := 0; i < 5; i++ {
        go func(i int) {
            msg := <-c
            c <- fmt.Sprintf("%s, hi from %d", msg, i)
        }(i)
    }
    c <- ", original"
    fmt.Println(<-c)
}

输出:

5, original, hi from 4

并且,看看缓冲通道会发生什么。例如,

package main

import "fmt"

func main() {
    c := make(chan string, 5+1)
    for i := 0; i < 5; i++ {
        go func(i int) {
            msg := <-c
            c <- fmt.Sprintf("%s, hi from %d", msg, i)
        }(i)
    }
    c <- "original"
    fmt.Println(<-c)
}

输出:

original

您也应该能够解释这些情况。

于 2013-03-30T06:55:36.053 回答
7

我研究了现有的解决方案并创建了简单的广播库https://github.com/grafov/bcast

    group := bcast.NewGroup() // you created the broadcast group
    go bcast.Broadcasting(0) // the group accepts messages and broadcast it to all members

    member := group.Join() // then you join member(s) from other goroutine(s)
    member.Send("test message") // or send messages of any type to the group 

    member1 := group.Join() // then you join member(s) from other goroutine(s)
    val := member1.Recv() // and for example listen for messages
于 2013-12-22T21:31:34.503 回答
2

对于一个通道上的多个 goroutine 监听,是的,这是可能的。关键是消息本身,你可以定义一些这样的消息:

package main

import (
    "fmt"
    "sync"
)

type obj struct {
    msg string
    receiver int
}

func main() {
    ch := make(chan *obj) // both block or non-block are ok
    var wg sync.WaitGroup
    receiver := 25 // specify receiver count

    sender := func() {
        o := &obj {
            msg: "hello everyone!",
            receiver: receiver,
        }
        ch <- o
    }
    recv := func(idx int) {
        defer wg.Done()
        o := <-ch
        fmt.Printf("%d received at %d\n", idx, o.receiver)
        o.receiver--
        if o.receiver > 0 {
            ch <- o // forward to others
        } else {
            fmt.Printf("last receiver: %d\n", idx)
        }
    }

    go sender()
    for i:=0; i<reciever; i++ {
        wg.Add(1)
        go recv(i)
    }

    wg.Wait()
}

输出是随机的:

5 received at 25
24 received at 24
6 received at 23
7 received at 22
8 received at 21
9 received at 20
10 received at 19
11 received at 18
12 received at 17
13 received at 16
14 received at 15
15 received at 14
16 received at 13
17 received at 12
18 received at 11
19 received at 10
20 received at 9
21 received at 8
22 received at 7
23 received at 6
2 received at 5
0 received at 4
1 received at 3
3 received at 2
4 received at 1
last receiver 4
于 2017-07-08T02:07:17.323 回答
0

一个很老的问题,但我想没有人提到这个。

首先,如果您多次运行代码,两个示例的输出可能会有所不同。这与 Go 版本无关。

第一个示例的输出可以是goroutine 4, goroutine 0, goroutine 1,... 实际上所有的 goroutine 都可以是一个将字符串发送到主 goroutine 的人。

主协程是协程之一,所以它也在等待来自通道的数据。哪个 goroutine 应该接收数据?没人知道。它不在语言规范中。

此外,第二个示例的输出也可以是任何内容:

(为了清楚起见,我添加了方括号)

// [original, hi from 4]
// [[[[[original, hi from 4], hi from 0], hi from 2], hi from 1], hi from 3]
// [[[[[original, hi from 4], hi from 1], hi from 0], hi from 2], hi from 3]
// [[[[[original, hi from 0], hi from 2], hi from 1], hi from 3], hi from 4]
// [[original, hi from 4], hi from 1]
// [[original, hi from 0], hi from 4]
// [[[original, hi from 4], hi from 1], hi from 0]
// [[[[[original, hi from 4], hi from 1], hi from 0], hi from 3], hi from 2]
// [[[[original, hi from 0], hi from 2], hi from 1], hi from 3]
//
// ......anything can be the output.

这不是魔法,也不是神秘现象。

If there are multiple threads being executed, no one knows exactly which thread will acquire the resource. The language doesn't determine it. Rather, OS takes care of it. This is why multithread programming is quite complicated.

Goroutine is not OS thread, but it behaves somewhat similarly.

于 2022-02-17T08:38:05.813 回答