14

我想做的是拥有一组生产者 goroutine(其中一些可能完成也可能不完成)和一个消费者例程。问题在于括号中的警告 - 我们不知道将返回答案的总数。

所以我想做的是:

package main

import (
  "fmt"
  "math/rand"
)

func producer(c chan int) {
  // May or may not produce.
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
}

func main() {
  c := make(chan int, 10)
  for i := 0; i < 10; i++ {
    go producer(c, signal)
  }

  // If we include a close, then that's WRONG. Chan will be closed
  // but a producer will try to write to it. Runtime error.
  close(c)

  // If we don't close, then that's WRONG. All goroutines will
  // deadlock, since the range keyword will look for a close.
  for num := range c {
    fmt.Printf("Producer produced: %d\n", num)
  }
  fmt.Println("All done.")
}

所以问题是,如果我关闭它是错误的,如果我不关闭 - 它仍然是错误的(参见代码中的注释)。

现在,解决方案将是一个带外信号通道,所有生产者都写入:

package main

import (
  "fmt"
  "math/rand"
)

func producer(c chan int, signal chan bool) {
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
  signal <- true
}

func main() {
  c := make(chan int, 10)
  signal := make(chan bool, 10)
  for i := 0; i < 10; i++ {
    go producer(c, signal)
  }

  // This is basically a 'join'.
  num_done := 0
  for num_done < 10 {
    <- signal
    num_done++
  }
  close(c)

  for num := range c {
    fmt.Printf("Producer produced: %d\n", num)
  }
  fmt.Println("All done.")
}

这完全符合我的要求!但对我来说,这似乎是一口。我的问题是:是否有任何成语/技巧可以让我以更简单的方式做类似的事情?

我在这里看了一下:http: //golang.org/doc/codewalk/sharemem/ 似乎completechan (在开头初始化main)在一个范围内使用但从未关闭。我不明白怎么做。

如果有人有任何见解,我将不胜感激。干杯!


编辑:fls0815 有答案,并且还回答了无关闭通道范围如何工作的问题。

我上面的代码修改为工作(在 fls0815 提供代码之前完成):

package main

import (
  "fmt"
  "math/rand"
  "sync"
)

var wg_prod sync.WaitGroup
var wg_cons sync.WaitGroup

func producer(c chan int) {
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
  wg_prod.Done()
}

func main() {
  c := make(chan int, 10)
  wg_prod.Add(10)
  for i := 0; i < 10; i++ {
    go producer(c)
  }

  wg_cons.Add(1)
  go func() {
    for num := range c {
      fmt.Printf("Producer produced: %d\n", num)
    }
    wg_cons.Done()
  } ()

  wg_prod.Wait()
  close(c)
  wg_cons.Wait()
  fmt.Println("All done.")
}
4

5 回答 5

16

只有生产者才能关闭频道。range您可以通过调用消费者来实现您的目标,消费者在您的生产者启动后在结果通道上迭代 ( )。在您的主线程中,您等待(参见 参考资料sync.WaitGroup)直到您的消费者/生产者完成他们的工作。生产者完成后,您关闭生成的通道,这将迫使您的消费者退出(range当通道关闭且没有缓冲项目时将退出)。

示例代码:

package main

import (
    "log"
    "sync"
    "time"
    "math/rand"
    "runtime"
)

func consumer() {
    defer consumer_wg.Done()

    for item := range resultingChannel {
        log.Println("Consumed:", item)
    }
}

func producer() {
    defer producer_wg.Done()

    success := rand.Float32() > 0.5
    if success {
        resultingChannel <- rand.Int()
    }
}

var resultingChannel = make(chan int)
var producer_wg sync.WaitGroup
var consumer_wg sync.WaitGroup

func main() {
    rand.Seed(time.Now().Unix())

    for c := 0; c < runtime.NumCPU(); c++ {
        producer_wg.Add(1)  
        go producer()
    }

    for c := 0; c < runtime.NumCPU(); c++ {
        consumer_wg.Add(1)
        go consumer()
    }

    producer_wg.Wait()

    close(resultingChannel)

    consumer_wg.Wait()
}

我将close- 语句放入 main 函数的原因是因为我们有多个生产者。在上面的示例中关闭一个生产者中的通道会导致您已经遇到的问题(在关闭的通道上写入;原因是可能剩下一个生产者仍在生产数据)。只有在没有生产者时才应关闭频道(因此我建议仅由生产者关闭频道)。这就是在 Go 中构建通道的方式。在这里,您将找到有关关闭频道的更多信息。


与 sharemem 示例相关:AFAICS 此示例通过一次又一次地重新排队资源(从挂起 -> 完成 -> 挂起 -> 完成......等等)来无限运行。这就是 main-func 末尾的迭代所做的。它接收完成的资源并使用 Resource.Sleep() 将它们重新排队以待处理。当没有完成的资源时,它会等待并阻塞新资源的完成。因此无需关闭通道,因为它们一直在使用。

于 2012-06-18T00:51:16.327 回答
1

总是有很多方法可以解决这些问题。这是使用 Go 中基础的简单同步通道的解决方案。没有缓冲通道,没有关闭通道,没有 WaitGroups。

它离你的“满口”解决方案真的不远,而且——很抱歉让你失望——并没有那么小。它确实将消费者放在自己的 goroutine 中,以便消费者可以在生产者生产数字时消费数字。它还区分了生产“尝试”可以以成功或失败告终。如果生产失败,则立即进行尝试。如果成功,则在使用该数字之前不会进行尝试。

package main

import (
    "fmt"
    "math/rand"
)

func producer(c chan int, fail chan bool) {
    if success := rand.Float32() > 0.5; success {
        c <- rand.Int()
    } else {
        fail <- true
    }
}

func consumer(c chan int, success chan bool) {
    for {
        num := <-c
        fmt.Printf("Producer produced: %d\n", num)
        success <- true
    }
}

func main() {
    const nTries = 10
    c := make(chan int)
    done := make(chan bool)
    for i := 0; i < nTries; i++ {
        go producer(c, done)
    }
    go consumer(c, done)

    for i := 0; i < nTries; i++ {
        <-done
    }
    fmt.Println("All done.")
}
于 2012-06-18T04:47:32.060 回答
0

我之所以添加这个,是因为现有的答案并没有使一些事情变得清楚。首先,codewalk 示例中的范围循环只是一个无限事件循环,它会不断地重新检查和更新同一个 url 列表。

接下来,通道本身已经Go 中惯用的消费者-生产者队列。支持通道的异步缓冲区的大小决定了生产者在获得背压之前可以生产多少。在下面设置 N = 0 以查看锁步生产者消费者,而没有任何人领先或落后。事实上,N = 10 将让生产者在阻塞之前生产多达 10 个产品。

最后,在 Go 中编写通信顺序进程有一些很好的习惯用法(例如,为您启动 go 例程的函数,以及使用 for/select 模式来通信和接受控制命令)。我认为 WaitGroups 很笨拙,而是希望看到惯用的示例。

package main

import (
    "fmt"
    "time"
)

type control int
const  (
    sleep control = iota
    die // receiver will close the control chan in response to die, to ack.
)

func (cmd control) String() string {
    switch cmd {
    case sleep: return "sleep"
    case die: return "die"
    }
    return fmt.Sprintf("%d",cmd)
}

func ProduceTo(writechan chan<- int, ctrl chan control, done chan bool) {
    var product int
    go func() {
        for {
            select {
        case writechan <- product:
            fmt.Printf("Producer produced %v\n", product)
            product++
        case cmd:= <- ctrl:
            fmt.Printf("Producer got control cmd: %v\n", cmd)
            switch cmd {
            case sleep:
                fmt.Printf("Producer sleeping 2 sec.\n")
                time.Sleep(2000 * time.Millisecond)
            case die:
                fmt.Printf("Producer dies.\n")
                close(done)
                return
            }
            }
        }
    }()
}

func ConsumeFrom(readchan <-chan int, ctrl chan control, done chan bool) {
    go func() {
        var product int
        for {
            select {
            case product = <-readchan:
                fmt.Printf("Consumer consumed %v\n", product)
            case cmd:= <- ctrl:
                fmt.Printf("Consumer got control cmd: %v\n", cmd)
                switch cmd {
                case sleep:
                    fmt.Printf("Consumer sleeping 2 sec.\n")
                    time.Sleep(2000 * time.Millisecond)
                case die:
                    fmt.Printf("Consumer dies.\n")
                    close(done)
                    return
                }

            }
        }
    }()
}

func main() {

    N := 10
    q := make(chan int, N)

    prodCtrl := make(chan control)
    consCtrl := make(chan control)

    prodDone := make(chan bool)
    consDone := make(chan bool)


    ProduceTo(q, prodCtrl, prodDone)
    ConsumeFrom(q, consCtrl, consDone)

    // wait for a moment, to let them produce and consume
    timer := time.NewTimer(10 * time.Millisecond)
    <-timer.C

    // tell producer to pause
    fmt.Printf("telling producer to pause\n")
    prodCtrl <- sleep

    // wait for a second
    timer = time.NewTimer(1 * time.Second)
    <-timer.C

    // tell consumer to pause
    fmt.Printf("telling consumer to pause\n")
    consCtrl <- sleep


    // tell them both to finish
    prodCtrl <- die
    consCtrl <- die

    // wait for that to actually happen
    <-prodDone
    <-consDone
}
于 2014-05-09T01:52:39.433 回答
0

如果您使用带有 fanIn 功能的生成器模式,您可以使用没有等待组的简单无缓冲通道。

在生成器模式中,每个生产者返回一个通道并负责关闭它。然后 fanIn 函数遍历这些通道并将返回的值转发到它返回的单个通道。

当然,问题是 fanIn 函数在每个通道关闭时转发通道类型(int)的零值。

您可以通过使用通道类型的零值作为标记值来解决此问题,并且仅在它们不是零值时才使用来自 fanIn 通道的结果。

这是一个例子:

package main

import (
    "fmt"
    "math/rand"
)

const offset = 1

func producer() chan int {
    cout := make(chan int)
    go func() {
        defer close(cout)
        // May or may not produce.
        success := rand.Float32() > 0.5
        if success {
            cout <- rand.Int() + offset
        }
    }()
    return cout
}

func fanIn(cin []chan int) chan int {
    cout := make(chan int)
    go func() {
        defer close(cout)
        for _, c := range cin {
            cout <- <-c
        }
    }()
    return cout
}

func main() {
    chans := make([]chan int, 0)
    for i := 0; i < 10; i++ {
        chans = append(chans, producer())
    }

    for num := range fanIn(chans) {
        if num > offset {
            fmt.Printf("Producer produced: %d\n", num)
        }
    }
    fmt.Println("All done.")
}
于 2016-06-10T10:31:31.743 回答
0

producer-consumer 是一种常见的模式,为了方便仔细处理chan 通信,我编写了一个库prosumer 。例如:

func main() {
    maxLoop := 10
    var wg sync.WaitGroup
    wg.Add(maxLoop)
    defer wg.Wait()

    consumer := func(ls []interface{}) error {
        fmt.Printf("get %+v \n", ls)
        wg.Add(-len(ls))
        return nil
    }

    conf := prosumer.DefaultConfig(prosumer.Consumer(consumer))
    c := prosumer.NewCoordinator(conf)
    c.Start()
    defer c.Close(true)

    for i := 0; i < maxLoop; i++ {
        fmt.Printf("try put %v\n", i)
        discarded, err := c.Put(i)
        if err != nil {
            fmt.Errorf("discarded elements %+v for err %v", discarded, err)
            wg.Add(-len(discarded))
        }
        time.Sleep(time.Second)
    }

}

close 有一个参数叫graceful,意思是是否耗尽底层的chan。

于 2019-10-14T10:39:00.847 回答