0

我正在使用segmentio-go通过 golang 发布消息。我想使用我尝试过kafka-pythonpykafka的 python 来阅读这些消息。
在所有这些库中,我无法接收消息,我可以通过在 golang 端创建消费者来从 golang 读取消息时读取消息。而且当我通过python发布消息时,我可以从python消费者和golang消费者那里读取它,但是那些通过golang发布消息的主题,python端的消费者完全挂起/卡住了,代码没有'不要崩溃,但也不要响应。

在 python 消费者中,我能够获取代理及其主题,仅对于那些通过 golang 发布消息的主题,python 消费者会被挂起。

Python消费者的代码:

consumer = KafkaConsumer(<topic name>, bootstrap_servers=[<ip:port>], auto_offset_reset='earliest', group_id=None, max_partition_fetch_bytes=104857600) 
print(consumer.topics()) # Able to get all topic names

for message in consumer:
    print(message.value)

Golang 生产者的代码

l := log.New(os.Stdout, "kafka framer: ", 0)

w := kafka.NewWriter(kafka.WriterConfig{
   Brokers: []string{broker_address},
   Topic:   topic_name,
   Logger: l,
})

err := w.WriteMessages(ctx, kafka.Message{
        Key: []byte(strconv.Itoa(0)),
        Value: []byte(data),
})

if err != nil {
   panic("could not write message " + err.Error())
}

请帮忙。

4

0 回答 0