0

我正在尝试学习如何在Spring-Integration中处理MQTT 消息。已创建一个转换器,它为每个 MQTT 主题订阅一个 MqttPahoMessageDrivenChannelAdapter 以使用和转换消息。

问题是我们的数据提供者正计划“加速”发布消息。因此,不是有几个(<=10)主题,每个主题都有大约 150 个字段的消息,而是计划将这些字段中的每一个发布到单独的 MQTT 主题。

这意味着我的转换器将不得不消耗 ca。1000个mqtt主题,但不知道是否:

  1. spring-integration 是否仍然是一个不错的选择。引起afaik。提到的适配器使用 PAHO MqttClient,它将在一个线程中使用它订阅的所有主题的消息,并且创建这些适配器的 1000 个实例是一种过度杀伤力。
  2. 如果我们进一步坚持使用 spring-integration 并使用提供的组件,那么为所有字段创建一个单一的入站适配器是否是一个好主意,这些字段以前在一个主题的消息中,但将转换从适配器 bean 转移到一个单独的 bean(进行转换)通过执行器通道连接到适配器,从而在某个线程池上并行执行这些字段的转换。

提前感谢您的回答!

4

1 回答 1

0

我觉得你的想法很有道理。

为此,您需要实现直通 MqttMessageConverter并提供MqttMessageas apayloadtopic标头:

public class PassThroughMqttMessageConverter implements MqttMessageConverter {

    @Override
    public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
        return MessageBuilder.withPayload(mqttMessage)
                .setHeader(MqttHeaders.RECEIVED_TOPIC, topic)
                .build();
    }

    @Override
    public Object fromMessage(Message<?> message, Class<?> targetClass) {
        return null;
    }

    @Override
    public Message<?> toMessage(Object payload, MessageHeaders headers) {
        return null;
    }

}

ExecutorChannel因此,在 custom中提到之后,您真的可以在下游执行目标转换transformer

您也可以考虑实现一个自定义MqttPahoClientFactory(可能也可以使用的扩展DefaultMqttPahoClientFactory)并提供一个自定义ScheduledExecutorService注入到MqttClient您要在getClientInstance().

于 2019-01-25T14:27:47.860 回答