0

这就是我使用spring集成定义我的mqtt连接的方式。我不确定这是否可能bt我们可以在收到10条消息后设置一个mqtt订阅者工作吗?现在订阅者在发布消息后正常工作。

    @Autowired
    ConnectorConfig config;


    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs(config.getUrl());
        factory.setUserName(config.getUser());
        factory.setPassword(config.getPass());
        return factory;
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(config.getClientid(), mqttClientFactory(), "ALERT", "READING");

        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttRouterChannel());
        return adapter;
    }

   /**this is router**/
   @MessageEndpoint
   public class MessageRouter {

    private final Logger logger = LoggerFactory.getLogger(MessageRouter.class);


    static final String  ALERT = "ALERT";
    static final String  READING = "READING";

    @Router(inputChannel = "mqttRouterChannel")
    public String route(@Header("mqtt_topic") String topic){
        String route = null;
        switch (topic){
            case ALERT:
                logger.info("alert message received");
                route = "alertTransformerChannel";
                break;
            case READING:
                logger.info("reading message received");
                route = "readingTransformerChannel";
                break;
        }
        return route;
    }
 }
4

1 回答 1

1

我需要一次批量处理 10 条消息

那不是MqttPahoMessageDrivenChannelAdapter责任。

我们在那里MqttCallback使用这个语义:

 * @param topic name of the topic on the message was published to
 * @param message the actual message.
 * @throws Exception if a terminal error has occurred, and the client should be
 * shut down.
 */
public void messageArrived(String topic, MqttMessage message) throws Exception;

因此,由于 Paho 客户端的性质,我们无法在此通道适配器上对它们进行批处理。

我们可以从 Spring Integration 的角度向您建议的是Aggregator EIP 实现。

在您的情况下,您应该在发送到路由器之前添加@ServiceActivatorbefore that AggregatorFactoryBean @BeanmqttRouterChannel

这可能很简单:

@Bean
@ServiceActivator(inputChannel = "mqttAggregatorChannel")
AggregatorFactoryBean mqttAggregator() {
    AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
    aggregator.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
    aggregator.setCorrelationStrategy(m -> 1);
    aggregator.setReleaseStrategy(new MessageCountReleaseStrategy(10));
    aggregator.setExpireGroupsUponCompletion(true);
    aggregator.setSendPartialResultOnExpiry(true);
    aggregator.setGroupTimeoutExpression(new ValueExpression<>(1000));
    aggregator.setOutputChannelName("mqttRouterChannel");
    return aggregator;
}

请参阅参考手册中的更多信息。

于 2017-08-03T20:08:38.347 回答