我正在尝试停止从服务器端连接到流服务器的所有客户端。实际上我正在使用GracefulStop
方法来优雅地处理它。
我正在等待os.Interrupt
通道上的信号为 gRPC 执行正常停止。server.GracefulStop()
但是当客户端连接时它会卡住。
func (s *Service) Subscribe(_ *empty.Empty, srv clientapi.ClientApi_SubscribeServer) error {
ctx := srv.Context()
updateCh := make(chan *clientapi.Update, 100)
stopCh := make(chan bool)
defer func() {
stopCh<-true
close(updateCh)
}
go func() {
ticker := time.NewTicker(1 * time.Second)
defer func() {
ticker.Stop()
close(stopCh)
}
for {
select {
case <-stopCh:
return
case <-ticker.C:
updateCh<- &clientapi.Update{Name: "notification": Payload: "sample notification every 1 second"}
}
}
}()
for {
select {
case <-ctx.Done():
return ctx.Err()
case notif := <-updateCh:
err := srv.Send(notif)
if err == io.EOF {
return nil
}
if err != nil {
s.logger.Named("Subscribe").Error("error", zap.Error(err))
continue
}
}
}
}
我希望context
in 方法ctx.Done()
可以处理它并打破 for 循环。如何关闭像这样的所有响应流?