0

我有一个关于在程序执行结束时处理 kafka 消费者的问题。这是负责关闭消费者的代码

func(kc *KafkaConsumer) Dispose() {
    Sugar.Info("Disposing of consumer")
    kc.mu.Lock()
    kc.Consumer.Close();
    Sugar.Info("Disposed of consumer")
    kc.mu.Unlock()
}

你可能已经注意到了,我正在使用sync.Mutex,因为消费者被多个goroutines访问。下面是另一个负责从 kafka 读取消息的片段

  func (kc *KafkaConsumer) Consume(signalChan chan os.Signal, ctx context.Context) {
    for{
        select{
        case sig := <-signalChan:
            Sugar.Info("Caught signal %v", sig)
            break
        case <-ctx.Done():
            Sugar.Info("Got context done message. Closing consumer...")
            kc.Dispose()
            break
        default:
            for{
                message, err := kc.Consumer.ReadMessage(-1); if err != nil{
                    Log.Error(err.Error())
                    return
                }
                Sugar.Infof("Got a new message %v",message)
                resp := make(chan *KafkaResponseEntity)
                go router.UseMessage(*message, resp, ctx)
                //Potential deadlock
                response := <-resp
                /*
                    Explicit commit of an offset in order to ensure 
                    that request has been successfully processed
                */
                kc.Consumer.Commit()
                Sugar.Info("Successfully commited an offset")
                Sugar.Infof("Just got a response %v", response)
                go producer.KP.Produce(response.PaymentId, response.Bytes, "some_random_topic")
            }
        }
    }
}

问题是当关闭消费者时,程序执行就会停止。有什么问题吗?我应该将 cond 与互斥锁一起使用吗?如果您对我的代码中可能出现的问题提供详尽的解释,我将非常高兴。提前致谢。

4

1 回答 1

1

我怀疑这是因为:

kc.Consumer.ReadMessage(-1)

文档状态无限期阻止,因此它没有关闭。最简单的方法是将该值设为正持续时间(例如 1 * time.Second),但如果在超时时间内未使用消息,您可能会收到超时错误。超时错误通常是无害的,但从链接的文档中需要考虑:

Timeout is returned as (nil, err) where err is
`err.(kafka.Error).Code() == kafka.ErrTimedOut`

我还不确定利用无限期阻塞并允许它被中断的好方法。如果有人知道,请发布调查结果!

于 2021-09-03T20:57:38.240 回答