1

当 Producer 设置为 customPartition 时,无法从生产者向 pulsar 发送消息(请参阅下面的代码)。

Producer<byte[]> producer = client.newProducer()
                .topic(pulsarTopic)
                //.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
                .messageRoutingMode(MessageRoutingMode.CustomPartition)
                .messageRouter( new MessageRounterImpl())
                .create();

发送消息的代码:

 producer.send(msg);

MessageRouterImpl 随机生成范围从 0 到 5 的数字,如下代码

public class MessageRounterImpl  implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
   Random r = new Random();
   return r.nextInt((0 - 5) + 1);
   
}

}

我的问题是为什么我无法使用CustomPartition从生产者那里发送消息以及为什么我收到低于日志消息的消息

从定时器线程批处理来自批处理容器的消息

使用 0 条消息对来自批处理容器的消息进行批处理

使用 MessageRoutingMode.RoundRobinPartition 和 MessageRoutingMode.SinglePartition 我能够从生产者那里发送消息。

如果有人对此有所了解,那将非常有帮助。

4

1 回答 1

1

首先,请注意分区主题应该在发布者开始发送消息之前显式创建,例如:

bin/pulsar-admin topics create-partitioned-topic persistent://tenant/namespace/partitioned-topic-name --partitions 5

其次,以下行将引发异常(它不能将负值作为输入处理):

return r.nextInt((0 - 5) + 1);

可以使用以下一种:

return r.nextInt(5);
于 2020-03-06T14:20:30.940 回答