我正在使用segmentio-go通过 golang 发布消息。我想使用我尝试过kafka-python、pykafka的 python 来阅读这些消息。
在所有这些库中,我无法接收消息,我可以通过在 golang 端创建消费者来从 golang 读取消息时读取消息。而且当我通过python发布消息时,我可以从python消费者和golang消费者那里读取它,但是那些通过golang发布消息的主题,python端的消费者完全挂起/卡住了,代码没有'不要崩溃,但也不要响应。
在 python 消费者中,我能够获取代理及其主题,仅对于那些通过 golang 发布消息的主题,python 消费者会被挂起。
Python消费者的代码:
consumer = KafkaConsumer(<topic name>, bootstrap_servers=[<ip:port>], auto_offset_reset='earliest', group_id=None, max_partition_fetch_bytes=104857600)
print(consumer.topics()) # Able to get all topic names
for message in consumer:
print(message.value)
Golang 生产者的代码
l := log.New(os.Stdout, "kafka framer: ", 0)
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{broker_address},
Topic: topic_name,
Logger: l,
})
err := w.WriteMessages(ctx, kafka.Message{
Key: []byte(strconv.Itoa(0)),
Value: []byte(data),
})
if err != nil {
panic("could not write message " + err.Error())
}
请帮忙。