43

我希望在两个频道上进行常规收听,当两个频道都耗尽时被阻塞。但是,如果两个通道都包含数据,我希望在处理另一个通道之前将一个通道排空。

在下面的工作示例中,我希望在处理out之前将所有内容都排空exit。我使用select没有任何优先顺序的 - 语句。我该如何解决这个问题,在退出之前处理所有 10 个输出值?

package main

import "fmt"

func sender(out chan int, exit chan bool){
    for i := 1; i <= 10; i++ {
        out <- i
    } 
    exit <- true
}

func main(){
    out := make(chan int, 10)
    exit := make(chan bool)

    go sender(out, exit)

    L:
    for {
        select {
            case i := <-out:
                fmt.Printf("Value: %d\n", i)
            case <-exit:
                fmt.Println("Exiting")
                break L
        }
    }
    fmt.Println("Did we get all 10? Most likely not")
}
4

9 回答 9

42

该语言本机支持这一点,不需要解决方法。很简单:退出通道应该只对生产者可见。退出时,生产者关闭频道。只有当通道为空且关闭时,消费者才会退出。这可以通过对信道进行测距来实现。

下面是一个例子来说明:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

var (
    produced  = 0
    processed = 0
)

func produceEndlessly(out chan int, quit chan bool) {
    defer close(out)
    for {
        select {
        case <-quit:
            fmt.Println("RECV QUIT")
            return
        default:
            out <- rand.Int()
            time.Sleep(time.Duration(rand.Int63n(5e6)))
            produced++
        }
    }
}

func quitRandomly(quit chan bool) {
    d := time.Duration(rand.Int63n(5e9))
    fmt.Println("SLEEP", d)
    time.Sleep(d)
    fmt.Println("SEND QUIT")
    quit <- true
}

func main() {
    vals, quit := make(chan int, 10), make(chan bool)
    go produceEndlessly(vals, quit)
    go quitRandomly(quit)
    for x := range vals {
        fmt.Println(x)
        processed++
        time.Sleep(time.Duration(rand.Int63n(5e8)))
    }
    fmt.Println("Produced:", produced)
    fmt.Println("Processed:", processed)
}
于 2012-06-21T04:59:31.797 回答
32
package main

import "fmt"

func sender(out chan int, exit chan bool) {
    for i := 1; i <= 10; i++ {
        out <- i
    }
    exit <- true
}

func main() {
    out := make(chan int, 10)
    exit := make(chan bool)

    go sender(out, exit)

    for {
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
            continue
        default:
        }
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
            continue
        case <-exit:
            fmt.Println("Exiting")
        }
        break
    }
    fmt.Println("Did we get all 10? I think so!")
}

第一个选择的默认情况使其非阻塞。select 将在不查看退出通道的情况下耗尽 out 通道,否则不会等待。如果输出通道为空,则立即下降到第二个选择。第二个选择是阻塞的。它将等待任一通道上的数据。如果出现退出,它会处理它并允许循环退出。如果数据来了,它会回到循环的顶部并返回到排水模式。

于 2012-06-20T14:17:01.530 回答
6

另一种方法:

package main

import "fmt"

func sender(c chan int) chan int {
        go func() {
                for i := 1; i <= 15; i++ {
                        c <- i
                }
                close(c)
        }()
        return c
}

func main() {
        for i := range sender(make(chan int, 10)) {
                fmt.Printf("Value: %d\n", i)
        }
        fmt.Println("Did we get all 15? Surely yes")
}

$ go run main.go
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Did we get all 15? Surely yes
$ 
于 2012-06-20T10:45:57.487 回答
3

这是解决select的优先级问题的通用习语。

是的,至少可以说不好,但可以做到 100% 所需,没有陷阱,也没有隐藏的限制

这是一个简短的代码示例,解释如下

package main

import(
    "fmt"
    "time"
)

func sender(out chan int, exit chan bool) {
    for i := 1; i <= 10; i++ {
        out <- i
    }

    time.Sleep(2000 * time.Millisecond)
    out <- 11
    exit <- true
}

func main(){
    out := make(chan int, 20)
    exit := make(chan bool)

    go sender(out, exit)

    time.Sleep(500 * time.Millisecond)

    L:
    for {
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
        default:
            select {
            case i := <-out:
                fmt.Printf("Value: %d\n", i)
            case <-exit:
                select {
                case i := <-out:
                    fmt.Printf("Value: %d\n", i)
                default:
                    fmt.Println("Exiting")
                    break L
                }
            }
        }
    }
    fmt.Println("Did we get all 10? Yes.")
    fmt.Println("Did we get 11? DEFINITELY YES")
}

而且,这是它的工作原理,main()上面的注释:

func main(){
    out := make(chan int, 20)
    exit := make(chan bool)
    go sender(out, exit)
    time.Sleep(500 * time.Millisecond)
    L:
    for {
        select {

            // here we go when entering next loop iteration
            // and check if the out has something to be read from

            // this select is used to handle buffered data in a loop

        case i := <-out:
            fmt.Printf("Value: %d\n", i)
        default:
            // else we fallback in here

            select {

                // this select is used to block when there's no data in either chan

            case i := <-out:
            // if out has something to read, we unblock, and then go the loop round again

                fmt.Printf("Value: %d\n", i)
            case <-exit:
                select {

                    // this select is used to explicitly propritize one chan over the another,
                    // in case we woke up (unblocked up) on the low-priority case

                    // NOTE:
                    // this will prioritize high-pri one even if it came _second_, in quick
                    // succession to the first one

                case i := <-out:
                    fmt.Printf("Value: %d\n", i)
                default:
                    fmt.Println("Exiting")
                    break L
                }
            }
        }
    }

    fmt.Println("Did we get all 10? Yes.")
    fmt.Println("Did we get 11? DEFINITELY YES")
}

注意:在玩有优先级的技巧之前,请确保您正在解决正确的问题。

很有可能,它可以以不同的方式解决。

尽管如此,在 Go 中优先选择 select 是一件很棒的事情。只是个梦..

注意:这是一个非常相似的答案https://stackoverflow.com/a/45854345/11729048在这个线程上,但是只有两个 select-s 是嵌套的,而不是像我一样的三个。有什么不同?我的方法更有效,我们明确希望在每次循环迭代中处理随机选择。

但是,如果高优先级通道没有缓冲,和/或您不希望它上有大量数据,只有零星的单个事件,那么更简单的两阶段习语(如该答案)就足够了:

L:
for {
    select {
    case i := <-out:
        fmt.Printf("Value: %d\n", i)
    case <-exit:
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
        default:
            fmt.Println("Exiting")
            break L
        }
    }
}

它基本上是 2 和 3 阶段,第 1 阶段被删除。

再一次:在大约 90% 的情况下,您认为确实需要优先考虑 chan 切换情况,但实际上不需要。

这是一个单行代码,可以包装在一个宏中:

for {
    select { case a1 := <-ch_p1: p1_action(a1); default: select { case a1 := <-ch_p1: p1_action(a1); case a2 := <-ch_p2: select { case a1 := <-ch_p1: p1_action(a1); default: p2_action(a2); }}}
}

如果你想优先考虑两个以上的情况怎么办?

那么你有两个选择。第一个 - 使用中间 goroutines 构建一棵树,以便每个 fork 都是二进制的(上面的习语)。

第二种选择是使优先级分叉更多然后加倍。

以下是三个优先级的示例:

for {
    select {
    case a1 := <-ch_p1:
        p1_action(a1)
    default:
        select {
        case a2 := <-ch_p2:
            p2_action(a2)
        default:
            select {    // block here, on this select
            case a1 := <-ch_p1:
                p1_action(a1)
            case a2 := <-ch_p2:
                select {
                case a1 := <-ch_p1:
                    p1_action(a1)
                default:
                    p2_action(a2)
                }
            case a3 := <-ch_p3:
                select {
                case a1 := <-ch_p1:
                    p1_action(a1)
                case a2 := <-ch_p2:
                    p1_action(a2)
                default:
                    p2_action(a3)
                }
            }
        }
    }
}

也就是说,整个结构在概念上分为三个部分,作为原始(二进制)部分。

再一次:机会是,你可以设计你的系统,这样你就可以避免这种混乱。

PS,修辞问题:为什么 Golang 没有将它内置到语言中???问题是修辞问题。

于 2020-04-07T18:30:03.600 回答
2

这是另一种选择。

消费者守则:

  go func() {
    stop := false
    for {
      select {
      case item, _ := <-r.queue:
        doWork(item)
      case <-r.stopping:
        stop = true
      }
      if stop && len(r.queue) == 0 {
        break
      }
    }
  }()
于 2017-09-25T02:52:56.900 回答
1

我创建了一个相当简单的解决方法。它可以满足我的要求,但是如果其他人有更好的解决方案,请告诉我:

exiting := false
for !exiting || len(out)>0 {
    select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
        case <-exit:
            exiting = true
            fmt.Println("Exiting")
    }
}

我没有在接收时退出,而是标记了一个退出,一旦我确定没有任何东西留在chan out.

于 2012-06-20T12:30:47.610 回答
1

我认为索尼娅的回答是不正确的。这是我的解决方案,有点复杂。

package main

import "fmt"

func sender(out chan int, exit chan bool){
    for i := 1; i <= 10; i++ {
        out <- i
    } 
    exit <- true
}

func main(){
    out := make(chan int, 10)
    exit := make(chan bool)

    go sender(out, exit)

    L:
    for {
        select {
            case i := <-out:
                fmt.Printf("Value: %d\n", i)
            case <-exit:
                for{
                    select{
                    case i:=<-out:
                        fmt.Printf("Value: %d\n", i)
                    default:
                        fmt.Println("Exiting")
                        break L
                    }
                }
                fmt.Println("Exiting")
                break L
        }
    }
    fmt.Println("Did we get all 10? Yes!")
}
于 2017-08-24T06:23:18.187 回答
0

就我而言,我真的想优先考虑来自一个通道的数据而不是另一个通道,而不仅仅是一个带外退出信号。为了其他有同样问题的人的利益,我认为这种方法在没有潜在竞争条件的情况下有效:

OUTER:
for channelA != nil || channelB != nil {

    select {

    case typeA, ok := <-channelA:
        if !ok {
            channelA = nil
            continue OUTER
        }
        doSomething(typeA)

    case nodeIn, ok := <-channelB:
        if !ok {
            channelB = nil
            continue OUTER
        }

        // Looped non-blocking nested select here checks that channelA
        // really is drained before we deal with the data from channelB
        NESTED:
        for {
            select {
            case typeA, ok := <-channelA:
                if !ok {
                    channelA = nil
                    continue NESTED
                }
                doSomething(typeA)

            default:
                // We are free to process the typeB data now
                doSomethingElse(typeB)
                break NESTED
            }
        }
    }

}
于 2017-07-06T10:29:15.340 回答
0

使用缓冲通道有什么具体原因make(chan int, 10)吗?

您需要使用您正在使用的无缓冲通道与缓冲通道。

只是删除10,应该是刚刚make(chan int)

这样,sender函数中的执行只能在来自通道的最后一条消息被语句出队exit <- true继续执行该语句。如果该语句尚未执行,则无法在 goroutine 中访问。outi := <-outexit <- true

于 2017-09-13T13:54:39.403 回答