我无法从主题中消费,我不确定我的代码或我的 kafka 配置是否有问题。我遇到的问题是它卡在“开始”的打印语句上,所以它没有收到来自频道 <-partitionConsumer.Messages() 的消息。
这些是我为我的 kafka 设置 ( https://kafka.apache.org/quickstart ) 采取的步骤,其中包含一些消息,我确定它们存在,因为当我运行以下命令时,我看到了值。
bin/kafka-console-consumer.sh --partition 0 --topic test --bootstrap-server localhost:9092 --offset 最早
- bin/zookeeper-server-start.sh 配置/zookeeper.properties
- bin/kafka-server-start.sh 配置/server.properties
- bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
func RetrieveConsumed() (int){
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
fmt.Println("ERROR")
panic(err)
}
defer func() {
if err := consumer.Close(); err != nil {
fmt.Println("ERROR")
}
}()
partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
if err != nil {
fmt.Println("ERROR")
panic(err)
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
fmt.Println("ERROR")
}
}()
// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
consumed := 0
ConsumerLoop:
for {
fmt.Println("Starts")
select {
case msg := <-partitionConsumer.Messages():
fmt.Printf("Consumed message offset %d\n", msg.Offset)
consumed++
case <- partitionConsumer.Errors():
break ConsumerLoop
case <-signals:
break ConsumerLoop
}
}
fmt.Printf("Consumed: %d\n", consumed)
return consumed
}