2

我正在尝试发布对传入消息的响应 messageArrived(...)。但是发布挂起和下一行: logOutgoingMessage(topic, message)永远不会被调用...最后我遇到了死锁并且客户端断开连接。

这是我的代码:

@Startup
@Singleton
public class AppliMqttClient implements MqttCallback {

@EJB
private AppliFacade facade;

@PostConstruct
public void start() {
    try {
        // connection options
        connOpts = new MqttConnectOptions();
        connOpts.setKeepAliveInterval(120);         
        connOpts.setCleanSession(true);
        connOpts.setWill(TESTAMENT_TOPIC, "DOWN!!!!!!!!!!!!!!!!!!".getBytes(), 0, false);

        client = new MqttClient(BROKER_URL, MQTT_CLIENT_ID);
        client.setCallback(this);
        connect();

        client.subscribe(SUBSCRIPTION_TOPIC, QoS);
    } catch (MqttException me) {
        log.error("Connection to " + BROKER_URL + " failed");
        logMqttException(me);
    }

}

private void connect() {
    // Tying a cycle of reconnects.
    boolean tryConnecting = true;
    while (tryConnecting) {
        try {
            client.connect(connOpts);
        } catch (Exception e1) {
            log.error("Connection attempt failed with '" + e1.getCause() + "'. Retrying.");             
        }
        if (client.isConnected()) {
            log.info("Connected to Broker " + BROKER_URL);
            tryConnecting = false;
        } else {
            pause();
        }
    }
}

private void publishAMessage(String topic, String pubMsg) {
    MqttMessage message = new MqttMessage(pubMsg.getBytes());
    message.setQos(QoS);
    // Publish the message
    log.info("Publishing to topic \"" + topic + "\" qos " + QoS);
    try {
        // Publish to the broker
        client.publish(topic, message);
        // Wait until the message has been delivered to the broker
        logOutgoingMessage(topic, message);
    } catch (Exception e) {
        log.error("Publishing to topic \"" + topic + "\" qos " + QoS + "failed.", e);
    }
}

private String handleRquest(AbstractRequest request) throws JsonProcessingException {
    ...

    return jsonResp;
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    // generate the response message ID
    messageId = "EB" + System.currentTimeMillis();

    // log the message
    logIncomingMessage(topic, message);

    // handle the message
    AbstractRequest request = getMapper().readValue(message.toString(), AbstractRequest.class);

    // handle the request
    String jsonResp = handleRquest(request);

    // publish message
    publishAMessage(request.getReplyTopic(), jsonResp);
}

/**
 * 
 * Method callback is invoked when a message published by this client is
 * successfully received by the broker.
 * 
 */
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
    // NOT NEEDED
}

}
4

2 回答 2

4

更改代码如下。

MqttDeliveryToken token;
...
MqttTopic mqttTopic = client.getTopic(topic);
try {
  // Publish to the broker
  token = mqttTopic.publish(new MqttMessage(pubMsg.getBytes()));
  logOutgoingMessage(topic, message);
  ...
 }

但我不明白为什么第一个实现不起作用:x 可能在 messageArrived() 中发布,QoS 2 不合适?

于 2015-08-13T06:14:21.613 回答
0

可以在此回调的实现中发送新消息(例如,对此消息的响应),但该实现不能断开客户端,因为无法为正在处理的消息发送确认,并且会发生死锁。

来自 eclipse.org 的官方链接

于 2016-02-17T15:05:54.077 回答