14

我需要定期刷新频道的内容。我用 len() 做到了这一点,我想知道是否有更好的方法来做到这一点。

http://play.golang.org/p/YzaI_2c_-F

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    commch := make(chan int, 100)
    go fillchan(commch)
    drainchan(commch)
}

func fillchan(commch chan int) {
    for {
        select {
        case <-time.Tick(30 * time.Millisecond):
            commch <- rand.Int()
        }
    }
}

func drainchan(commch chan int) {
    for {
        chanlen := len(commch) // get number of entries in channel
        time.Sleep(1 * time.Second)
        for i := 0; i <= chanlen; i++ { //flush them based on chanlen
            fmt.Printf("chan len: %s num: %s\n", chanlen, <-commch)
        }
    }
}

编辑 1:似乎这是更好的方法 http://play.golang.org/p/4Kp8VwO4yl

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    commch := make(chan int, 1000)
    go fillchan(commch)
    for {
        select {
        case <-time.Tick(1000 * time.Millisecond):
            drainchan(commch)
        }
    }
}

func fillchan(commch chan int) {
    for {
        select {
        case <-time.Tick(300 * time.Millisecond):
            commch <- rand.Int()
        }
    }
}

func drainchan(commch chan int) {
    for {
        select {
        case e := <-commch:
            fmt.Printf("%s\n",e)
        default:
            return
        }
    }
}

编辑2:删除选择,防止内存泄漏与时间。Tick http://play.golang.org/p/WybAhRE3u4

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    commch := make(chan int, 1000)
    go fillchan(commch)
    for _ = range time.Tick(1000 * time.Millisecond) {
        drainchan(commch)
    }
}

func fillchan(commch chan int) {
    for _ = range time.Tick(300 * time.Millisecond) {
        commch <- rand.Int()
    }
}

func drainchan(commch chan int) {
    for {
        select {
        case e := <-commch:
            fmt.Printf("%s\n", e)
        default:
            return
        }
    }
}
4

3 回答 3

8

清除通道内容的需要是不寻常的。通道不提供此功能 - 但您可以创建一个会以这种方式运行的 goroutine(...如果您真的想要)。

通常,您会更多地考虑在一个通道上输入并在另一个通道上输出的 goroutine;两个通道都携带相同的数据类型。原则上,您可以通过这种方式对所有缓冲通道进行建模;对于它的客户端,goroutine 的行为就像一个普通的缓冲通道,因为它传递它接收到的东西。

在 goroutine 中添加第三个通道,并在它和输入之间进行选择。这将允许您触发缓冲区的清空,而不会出现竞争条件。简单。

现在有三个通道连接到 goroutine - 两个输入和一个输出。因此,当您设计将使用它的东西时,您可以推断刷新该数据的语义是什么。

一个亲戚浮现在脑海。考虑一个具有一个输入和一个输出通道的 goroutine。它提供了一个固定大小的覆盖缓冲区,即始终准备从其输入通道读取的缓冲区,即使在输出通道被阻塞时也是如此。这也需要一个带有默认情况的选择,但不需要第三个通道。覆盖缓冲区有一个明确的用例:当通道和 goroutine 连接到循环中时,很可能会出现死锁。覆盖缓冲区作为死锁的一种候选解决方案派上用场,因为某些数据在迟到时无用 - 例如,当应用程序太忙而无法响应它们时,您可以在 GUI 中丢弃鼠标事件。

于 2013-06-25T18:03:27.173 回答
2

BatchingChannel我认为来自https://github.com/eapache/channels的内容可以满足您的需要。

于 2014-03-12T15:11:01.933 回答
1

当我遇到这个问题时,我就是这样处理的。

func (s *ReadStream) drain() {
    go func() {
        for b := range s.chan {
            blackhole(b)
        }
    }()
}

func blackhole(b []byte) {}

在您的选择可能被阻止的情况下 context.Context 似乎是错误的选择:

for {
    select {
    case <-ctx.Done():
        return
    default:
        send<-getData()
    }
}

如果 send 已满,在接收到 done 信号之前,我们将受到外部 goroutine 的支配。如果您确定消费者会在频道关闭之前继续阅读,这是可以的,但如果这些消费者可能会遇到错误情况并返回,那么您能做的就是希望。在这种特定情况下,我喜欢用内部退出通道和等待组替换上下文,然后提供公共 Kill() 方法。当然,只要我完全确定我可以丢弃数据。

func (s *ReadStream) Kill() {
    s.quit<-struct{}{}
    s.drain()   // ensure goroutine sees the cancel
    s.wg.Wait() // wait for goroutine to see the cancel
    s.close()
}
于 2017-08-04T08:50:37.790 回答