我正在尝试发布对传入消息的响应 messageArrived(...)
。但是发布挂起和下一行: logOutgoingMessage(topic, message)
永远不会被调用...最后我遇到了死锁并且客户端断开连接。
这是我的代码:
@Startup
@Singleton
public class AppliMqttClient implements MqttCallback {
@EJB
private AppliFacade facade;
@PostConstruct
public void start() {
try {
// connection options
connOpts = new MqttConnectOptions();
connOpts.setKeepAliveInterval(120);
connOpts.setCleanSession(true);
connOpts.setWill(TESTAMENT_TOPIC, "DOWN!!!!!!!!!!!!!!!!!!".getBytes(), 0, false);
client = new MqttClient(BROKER_URL, MQTT_CLIENT_ID);
client.setCallback(this);
connect();
client.subscribe(SUBSCRIPTION_TOPIC, QoS);
} catch (MqttException me) {
log.error("Connection to " + BROKER_URL + " failed");
logMqttException(me);
}
}
private void connect() {
// Tying a cycle of reconnects.
boolean tryConnecting = true;
while (tryConnecting) {
try {
client.connect(connOpts);
} catch (Exception e1) {
log.error("Connection attempt failed with '" + e1.getCause() + "'. Retrying.");
}
if (client.isConnected()) {
log.info("Connected to Broker " + BROKER_URL);
tryConnecting = false;
} else {
pause();
}
}
}
private void publishAMessage(String topic, String pubMsg) {
MqttMessage message = new MqttMessage(pubMsg.getBytes());
message.setQos(QoS);
// Publish the message
log.info("Publishing to topic \"" + topic + "\" qos " + QoS);
try {
// Publish to the broker
client.publish(topic, message);
// Wait until the message has been delivered to the broker
logOutgoingMessage(topic, message);
} catch (Exception e) {
log.error("Publishing to topic \"" + topic + "\" qos " + QoS + "failed.", e);
}
}
private String handleRquest(AbstractRequest request) throws JsonProcessingException {
...
return jsonResp;
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// generate the response message ID
messageId = "EB" + System.currentTimeMillis();
// log the message
logIncomingMessage(topic, message);
// handle the message
AbstractRequest request = getMapper().readValue(message.toString(), AbstractRequest.class);
// handle the request
String jsonResp = handleRquest(request);
// publish message
publishAMessage(request.getReplyTopic(), jsonResp);
}
/**
*
* Method callback is invoked when a message published by this client is
* successfully received by the broker.
*
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// NOT NEEDED
}
}