我有一个 Pulsar 主题,我通过多个生产者向该主题发布消息。这些生产者只是 goroutines,我正在使用SendAsync
生产者上的方法发送消息,但我不断收到Got unexpected send receipt for message
警告。我查看了客户端的库代码(Pulsar 的 golang 客户端),发现只有当生产者未在此 map 中注册为侦听器时,才会显示此警告map[uint64]ConnectionListener
。但是,当生产者未注册为侦听器时,我如何发布?
func benchmarkConcurrentProducers(b *testing.B, pool []pulsar.Producer, concur int, payload []byte) {
wg := &sync.WaitGroup{}
wg.Add(concur)
for i := 0; i < concur; i++ {
go Produce(pool[i], payload, wg)
}
wg.Wait()
}
func BenchmarkProducer(b *testing.B) {
type config struct {
Pulsarcfg struct {
Url string `required: "true"`
}
}
cfg := &config{}
err := configor.Load(cfg, "../config.yml")
require.Nil(b, err)
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: cfg.Pulsarcfg.Url,
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
require.Nil(b, err)
defer client.Close()
concur := 4
producerPool := make([]pulsar.Producer, concur)
for i := 0; i < concur; i++ {
p, err := New(client, string(i))
require.Nil(b, err)
defer p.Close()
producerPool[i] = p
}
path, err := filepath.Abs("../events.json")
if err != nil {
log.Fatalf("invalid payload file path, %v", err)
}
f, err := ioutil.ReadFile(path)
if err != nil {
log.Fatalf("bad payload: %v", err)
}
b.Run("benchmark producer", func(b *testing.B) {
// run the producer b.N times
for n := 0; n < b.N; n++ {
benchmarkConcurrentProducers(b, producerPool, concur, f)
}
})
}