0

我正在使用 Kafka 和 Golang 进行测试

我在用着:

码头工人: https ://hub.docker.com/r/bitnami/kafka

萨拉马: https ://github.com/Shopify/sarama

例子很简单,就是一个连接Kafka的Consumer: https ://godoc.org/github.com/Shopify/sarama#example-Consumer

代码是这样的:

package main

import (
    "log"
    "os"
    "os/signal"

    "github.com/Shopify/sarama"
)

func main() {

    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := consumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetNewest)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    // Trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    consumed := 0
    ConsumerLoop:
    for {
        select {
        case msg := <-partitionConsumer.Messages():
            log.Printf("Consumed message offset %d\n", msg.Offset)
            consumed++
        case <-signals:
            break ConsumerLoop
        }
    }

    log.Printf("Consumed: %d\n", consumed)
}

但是在执行时: go run main.go

它向我显示以下错误:

panic: dial tcp: lookup fd6ee3862a45: no such host

goroutine 1 [running]:
main.main()
    /Users/vn0sgkq/go/src/github.com/hectorgool/kafka1/main.go:25 +0x3f1
exit status 2

回购在这里: https ://github.com/hectorgool/kafka1/blob/master/main.go#L25

是的,我知道我缺少消息的生产者,但奇怪的是:consumer.ConsumePartition 不起作用

4

0 回答 0