当 Producer 设置为 customPartition 时,无法从生产者向 pulsar 发送消息(请参阅下面的代码)。
Producer<byte[]> producer = client.newProducer()
.topic(pulsarTopic)
//.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter( new MessageRounterImpl())
.create();
发送消息的代码:
producer.send(msg);
MessageRouterImpl 随机生成范围从 0 到 5 的数字,如下代码
public class MessageRounterImpl implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
Random r = new Random();
return r.nextInt((0 - 5) + 1);
}
}
我的问题是为什么我无法使用CustomPartition从生产者那里发送消息以及为什么我收到低于日志消息的消息
从定时器线程批处理来自批处理容器的消息
使用 0 条消息对来自批处理容器的消息进行批处理
使用 MessageRoutingMode.RoundRobinPartition 和 MessageRoutingMode.SinglePartition 我能够从生产者那里发送消息。
如果有人对此有所了解,那将非常有帮助。