简短版本:有没有办法在不重新创建或循环遍历的情况下清空 go 通道?
原因:我使用两个通道来发送和接收数据,并且我有一个额外的通道来表示需要重新连接。
现在,当传输已重置/重新连接时,我想“清空”额外的通道以确保没有任何挥之不去的其他重置请求会导致事物再次重新连接。
没有循环就无法清空通道。如果您没有任何并发接收者,那么您可以使用这个简单的循环:
for len(ch) > 0 {
<-ch
}
如果您确实有并发接收者,请使用循环:
L:
for {
select {
case <-c:
default:
break L
}
}
您所描述的内容本质上是活泼的,因为可能存在重新连接频道的合法请求。我建议不要试图耗尽频道,而是跟踪时间。
在您的重新连接频道上,发布时间。完成重新连接后,记下时间。在使用重新连接通道时,丢弃任何比上次重新连接更旧的消息。
实现此目的的另一个更同步的解决方案是使重新连接通道成为布尔值。发布“true”以重新连接。重新连接完成后,发布“false”。然后消费频道,直到你发现“假”。
另一种方法是使用sync.Cond
and atomic
,类似于:
type Server struct {
s chan int
r chan int
c *sync.Cond
state uint32
}
const (
sNormal = 0
sQuitting = 1
sReconnecting = 2
)
func New() *Server {
s := &Server{
s: make(chan int),
r: make(chan int),
c: sync.NewCond(&sync.Mutex{}),
}
go s.sender()
// go s.receiver()
return s
}
func (s *Server) sender() {
//
for {
select {
case data := <-s.s:
//do stuff with data
default:
s.c.L.Lock()
L:
for {
switch atomic.LoadUint32(&s.state) {
case sNormal:
break L
case sReconnecting:
case sQuitting:
s.c.L.Unlock()
return
}
s.c.Wait()
}
s.c.L.Unlock()
}
}
}
//repeat for receiver
func (s *Server) Reconnect() {
var cannotReconnect bool
atomic.StoreUint32(&s.state, sReconnecting)
//keep trying to reconnect
if cannotReconnect {
atomic.StoreUint32(&s.state, sQuitting)
} else {
atomic.StoreUint32(&s.state, sNormal)
}
s.c.Broadcast()
}
听起来你想要一个重置 goroutine 而不是重置通道。它将有一个来自发送复位信号的一侧的输入,以及一个到接收器的输出。当这个 goroutine 收到重新连接的请求时,它会将其传递给接收者。然后它等待在第三个通道上接收来自接收器的确认,同时丢弃它收到的任何重新连接请求。所以总共3个通道,1个输入,1个输出,1个ack。