5

我正在尝试实现一个简单的逻辑,其中Producer将数据发送到ch具有永久for循环的通道,而 Consumer 从通道读取ch

Producer 在通道上接收到信号时停止生产并退出永久循环quit

代码是这样的(另见这个游乐场

func main() {
    ch := make(chan int)
    quit := make(chan bool)
    var wg sync.WaitGroup
    wg.Add(1)
    go produce(ch, quit, &wg)
    go consume(ch)
    time.Sleep(1 * time.Millisecond)
    fmt.Println("CLOSE")
    close(quit)
    wg.Wait()
}

func produce(ch chan int, quit chan bool, wg *sync.WaitGroup) {
    for i := 0; ; i++ {
        select {
        case <-quit:
            close(ch)
            fmt.Println("exit")
            wg.Done()
            return //we exit
        default:
            ch <- i
            fmt.Println("Producer sends", i)
        }
    }
}

func consume(ch chan int) {
    for {
        runtime.Gosched() // give the opportunity to the main goroutine to close the "quit" channel
        select {
        case i, more := <-ch:
            if !more {
                fmt.Println("exit consumer")
                return
            }
            fmt.Println("Consumer receives", i)
        }
    }
}

如果我在我的机器(一台 4 核的 Mac)上运行这段代码,一切正常。如果我在Go Playgroud上尝试相同的代码,它总是会超时。我猜这是因为 Go Playground 是单核,因此无限循环不会给其他 goroutine 运行的机会,但是我不明白为什么该指令runtime.Gosched()没有任何效果。

只是为了完成我所看到的图片,如果我GOMAXPROCS=1在我的 Mac 上设置,该程序仍然可以正常工作并按预期退出。如果我GOMAXPROCS=1在我的 Mac 上设置并删除runtime.Gosched()指令,行为会变得脆弱:有时程序会按预期终止,有时它似乎永远不会退出无限循环。

4

1 回答 1

4

您创建了一个不应该在实际程序中发生的病态情况,因此调度程序没有经过优化来处理这种情况。结合操场上的假时间实现,在超时之前你会获得太多的生产者和消费者循环。

生产者 goroutine 尽可能快地创建值,而消费者始终准备好接收它们。使用GOMAPXPROCS=1,调度程序在被迫抢占可用工作以检查主 goroutine 之前花费所有时间在两者之间来回切换,这比操场允许的时间更长。

如果我们为生产者-消费者对添加一些事情,我们可以限制他们必须垄断调度程序的时间。例如,向time.Sleep(time.Microsecond)消费者添加 a 将导致操场打印 1000 个值。这也显示了模拟时间在操场上的“准确”程度,因为对于处理每条消息需要非零时间的普通硬件来说,这是不可能的。

虽然是一个有趣的案例,但这对实际程序几乎没有影响。

一些注意事项,您可以range通过一个通道接收所有值,defer wg.Done如果可能,您应该始终在 goroutine 的开头,您可以在其中发送值,select case这允许您在发送未准备好时实际取消 for-select 循环,如果您想要“退出消费者”消息,您还需要将 发送WaitGroup给消费者。

https://play.golang.org/p/WyPmpY9pFl7

func main() {
    ch := make(chan int)
    quit := make(chan bool)
    var wg sync.WaitGroup
    wg.Add(2)
    go produce(ch, quit, &wg)
    go consume(ch, &wg)
    time.Sleep(50 * time.Microsecond)
    fmt.Println("CLOSE")
    close(quit)
    wg.Wait()
}

func produce(ch chan int, quit chan bool, wg *sync.WaitGroup) {
    defer wg.Done()

    for i := 0; ; i++ {
        select {
        case <-quit:
            close(ch)
            fmt.Println("exit")
            return
        case ch <- i:
            fmt.Println("Producer sends", i)
        }
    }
}

func consume(ch chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for i := range ch {
        fmt.Println("Consumer receives", i)
        time.Sleep(time.Microsecond)
    }
    
    fmt.Println("exit consumer")
    return
}
于 2020-09-27T14:50:01.573 回答