2

With Spring's integration libraries, I am trying to connect to mosquitto and read/send messages... But there are some things I couldn't figure out.

1 - When initilazing app, app connects to mosquitto, but mosquitto receives hundreds of connection requests again from same app with same id in seconds. This is the example of log :

New connection from 127.0.0.1 on port 1555.
Client springClient already connected, closing old connection.
Client springClient disconnected.
New client connected from 127.0.0.1 as springClient (c1, k60).
Sending CONNACK to springClient (0, 0)
Received SUBSCRIBE from springClient
    0001/001/INF (QoS 1)
springClient 1 0001/001/INF
Sending SUBACK to springClient
New connection from 127.0.0.1 on port 1555.
Client springClient already connected, closing old connection.
Client springClient disconnected.

2 - I can't get any messages from mosquitto using this configuration :

Spring XML :

<!-- This is for reading messages -->
<bean id="mqttInbound" class="com.mobistech.drc.m2mproject.mqtt.MqttCustomInboundAdapter">
    <beans:constructor-arg name="clientId" value="springClient" />
    <beans:constructor-arg name="clientFactory" ref="clientFactory" />
    <beans:constructor-arg name="topic" value="0001/001/INF" />
    <beans:property name="autoStartup" value="true"></beans:property>
    <beans:property name="outputChannel" ref="fromBrokerChannel"></beans:property>
</bean>

 <int:channel  id="fromBrokerChannel" />

Custom Adapter :

public class MqttCustomInboundAdapter extends MqttPahoMessageDrivenChannelAdapter {

    public MqttCustomInboundAdapter(String clientId,
            MqttPahoClientFactory clientFactory, String[] topic) {
        super(clientId, clientFactory, topic);
        // TODO Auto-generated constructor stub
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception
    {
        super.messageArrived(topic, message);
        System.out.println("**************** Message from topic : " + topic);
        System.out.println("**************** Message : " + new String(message.getPayload()));
    }

    public void addTopicIfNotExists(String topic)
    {
        for(String topicName:getTopic())
        {
            if(topicName.equals(topic))return;
        }

        addTopic(topic);

        System.out.println("************* Added Topic : " + topic);

        for(String topicName:getTopic())
        {
            System.out.println(topicName);
        }
    }
}

I'm not using service-activator because I need to know which topic that arrived message sent from, so I've wrapped the MqttPahoMessageDrivenChannelAdapter as its mentioned within the Spring Integration Docs

So is there any suggestions ?

4

2 回答 2

2

I managed to configure mqtt with java config

@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {

    MqttPahoMessageDrivenChannelAdapter mqtt = new MqttPahoMessageDrivenChannelAdapter( applicationName + "-sub", clientFactory( ), "/#" );
    mqtt.setQos( 2 );
    mqtt.setOutputChannel( outbount( ) );
    mqtt.setAutoStartup( true );
    mqtt.setTaskScheduler( taskScheduler( ) );

    return mqtt;
}

@Bean
public MqttPahoMessageHandler mqqtMessageHandler() {

    return new MqttPahoMessageHandler( applicationName + "-pub", clientFactory( ) );
}

@Bean
public DefaultMqttPahoClientFactory clientFactory() {

    DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory( );
    clientFactory.setUserName( "test" );
    clientFactory.setPassword( "test" );
    clientFactory.setServerURIs( new String[] { "tcp://url:1883" } );
    return clientFactory;
}

@Bean
public PublishSubscribeChannel outbount() {

    PublishSubscribeChannel psc = new PublishSubscribeChannel( );
    psc.subscribe( new MessageHandler( ) {

        @Override
        public void handleMessage( Message<?> message ) throws MessagingException {

            logger.warn( message );

        }
    } );

    return psc;
}

to send a message add the following :

@Autowired
MqttPahoMessageHandler mqtt;

@RequestMapping( "/" )
public ModelAndView getHomePage() throws MqttPersistenceException, MqttException {

    Message<String> message = MessageBuilder.withPayload( "spring - test" ).setHeader( MqttHeaders.TOPIC, "/topic" ).build( );

    mqtt.handleMessage( message );

    return new ModelAndView( "home" );
}   
于 2015-08-18T16:46:13.340 回答
1

I've found it. After doing some research, I've decided to use service-activator for activating the service (which is obvious). I can get messages after this point.

And about mosquitto's strange behaivour, I figured it out that, it wasn't about the mosquitto. When MqttCustomInboundAdapter's autoStartup property set to true, application sends too many connect requests. That was the reason that mosquitto getting this connection requests and trying to connect them one by one causing the old one getting disconnected.

New XML looks like this :

 <bean id="mqttInbound" class="com.mobistech.drc.m2mproject.mqtt.MqttCustomInboundAdapter">
    <beans:constructor-arg name="clientId" value="springClient" />
    <beans:constructor-arg name="clientFactory" ref="clientFactory" />
    <beans:constructor-arg name="topic" value="0001/001/INF" />
    <beans:property name="autoStartup" value="false"></beans:property>
    <beans:property name="outputChannel" ref="fromBrokerChannel"></beans:property>
    <beans:property name="converter" ref="mqttMessageConverter"></beans:property>
</bean>

 <int:channel id="fromBrokerChannel" />
<int:service-activator input-channel="fromBrokerChannel" ref="mqttServiceActivator" ></int:service-activator>

Now I can get messages...

于 2015-06-02T18:02:47.170 回答