我有一个关于在程序执行结束时处理 kafka 消费者的问题。这是负责关闭消费者的代码
func(kc *KafkaConsumer) Dispose() {
Sugar.Info("Disposing of consumer")
kc.mu.Lock()
kc.Consumer.Close();
Sugar.Info("Disposed of consumer")
kc.mu.Unlock()
}
你可能已经注意到了,我正在使用sync.Mutex,因为消费者被多个goroutines访问。下面是另一个负责从 kafka 读取消息的片段
func (kc *KafkaConsumer) Consume(signalChan chan os.Signal, ctx context.Context) {
for{
select{
case sig := <-signalChan:
Sugar.Info("Caught signal %v", sig)
break
case <-ctx.Done():
Sugar.Info("Got context done message. Closing consumer...")
kc.Dispose()
break
default:
for{
message, err := kc.Consumer.ReadMessage(-1); if err != nil{
Log.Error(err.Error())
return
}
Sugar.Infof("Got a new message %v",message)
resp := make(chan *KafkaResponseEntity)
go router.UseMessage(*message, resp, ctx)
//Potential deadlock
response := <-resp
/*
Explicit commit of an offset in order to ensure
that request has been successfully processed
*/
kc.Consumer.Commit()
Sugar.Info("Successfully commited an offset")
Sugar.Infof("Just got a response %v", response)
go producer.KP.Produce(response.PaymentId, response.Bytes, "some_random_topic")
}
}
}
}
问题是当关闭消费者时,程序执行就会停止。有什么问题吗?我应该将 cond 与互斥锁一起使用吗?如果您对我的代码中可能出现的问题提供详尽的解释,我将非常高兴。提前致谢。