我正在使用弹簧集成 Mqtt。每当我重新启动应用程序时,它都会从第一个位置开始使用消息。是否有任何属性可以使应用程序从上次读取或当前位置读取。
我找不到它。
如果没有这样的选项,那么如果下线部署了新版本并且应用程序从第一个位置开始使用,就会出现问题
下面是我用来测试的代码
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(),
batteryLevel, locationChange);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
adapter.setManualAcks(false);
return adapter;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { serverAddress });
options.setCleanSession(false);
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
LOGGER.info("topic : " +topic);
LOGGER.info(message.getPayload().toString())
}
}