8

我有一部分频道都收到相同的消息:

func broadcast(c <-chan string, chans []chan<- string) {
    for msg := range c {
        for _, ch := range chans {
            ch <- msg
        }
    }
}

但是,由于其中的每个频道chans都可能以不同的速率被读取,所以当我遇到一个缓慢的消费者时,我不想阻止其他频道。我已经用 goroutines 解决了这个问题:

func broadcast(c <-chan string, chans []chan<- string) {
    for msg := range c {
        for _, ch := range chans {
            go func() { ch <- msg }()
        }
    }
}

但是,传递到每个通道的消息的顺序很重要。我查看了规范以查看通道在被阻塞时是否保持顺序,我发现的只是:

如果容量大于零,则通道是异步的:如果缓冲区未满(发送)或非空(接收),则通信操作成功而不会阻塞,并且元素按发送顺序接收。

对我来说,如果写入被阻止,那么它不是“发送”,而是等待发送。有了这个假设,上面没有说明当多个 goroutine 被阻塞时的发送顺序。

通道畅通后,发送顺序是否有任何保证?

4

4 回答 4

8

不,没有任何保证。

即使通道未满,如果两个 goroutine 几乎同时启动并发送给它,我认为不能保证首先启动的 goroutine 会真正先执行。所以你不能指望消息按顺序到达。

于 2013-04-07T03:59:58.450 回答
4

如果通道已满,您可以删除消息(然后设置一个标志以暂停客户端并向他们发送消息,表明他们正在删除消息或其他内容)。

类似于(未经测试)的东西:

type Client struct {
    Name string
    ch   chan<-string
}

func broadcast(c <-chan string, chans []*Client) {
    for msg := range c {
        for _, ch := range chans {
            select {
            case ch.ch <- msg:
            // all okay
            default:
                log.Printf("Channel was full sending '%s' to client %s", msg, ch.Name)
            }
        }
    }
}
于 2013-04-07T04:35:03.737 回答
1

在此代码中,不作任何保证。

给定示例代码的主要问题不在于通道行为,而在于大量创建的 goroutines。所有的 goroutine 都在同一个叠层循环中“触发”,没有进一步的同步,所以即使在它们开始发送消息之前,我们根本不知道哪些会先执行。

然而,这通常会引发一个合理的问题:如果我们以某种方式保证几个阻塞发送指令的顺序,我们是否保证以相同的顺序接收它们?

发送的“之前发生”属性很难创建。我担心这是不可能的,因为:

  1. 在发送指令之前任何事情都可能发生:例如,其他 goroutine 是否执行自己的发送
  2. 在发送中被阻塞的 goroutine 不能同时管理其他类型的同步

例如,如果我有 10 个编号为 1 到 10 的 goroutine,我无法让它们以正确的顺序同时将自己的编号发送到通道。我所能做的就是使用各种顺序技巧,比如在 1 个单独的 goroutine 中进行排序。

于 2013-04-07T11:57:05.120 回答
0

这是对已发布答案的补充。

正如几乎所有人所说,问题在于 goroutine 的执行顺序,您可以通过传递要运行的 goroutine 的编号来轻松地使用通道来协调 goroutine 的执行:

func coordinated(coord chan int, num, max int, work func()) {
    for {
        n := <-coord

        if n == num {
            work()
            coord <- (n+1) % max
        } else {
            coord <- n
        }
    }
}

coord := make(chan int)

go coordinated(coord, 0, 3, func() { println("0"); time.Sleep(1 * time.Second) })
go coordinated(coord, 1, 3, func() { println("1"); time.Sleep(1 * time.Second) })
go coordinated(coord, 2, 3, func() { println("2"); time.Sleep(1 * time.Second) })

coord <- 0

或者通过使用以有序方式执行工作人员的中央 goroutine:

func executor(funs chan func()) {
    for {
        worker := <-funs
        worker()
        funs <- worker
    }
}

funs := make(chan func(), 3)

funs <- func() { println("0"); time.Sleep(1 * time.Second) }
funs <- func() { println("1"); time.Sleep(1 * time.Second) }
funs <- func() { println("2"); time.Sleep(1 * time.Second) }

go executor(funs)

当然,这些方法会由于同步而消除所有并行性。但是,您的程序的并发方面仍然存在。

于 2013-04-07T16:35:20.260 回答