我开发了一个测试 Java EE 应用程序来通过 MQTT 协议(tcp 连接)发送和接收消息。在 JBoss 7.1 下,我得到了一个 @Singleton @Startup,它实例化了一个 Eclipse Paho 客户端和一个回调类。
@Singleton
@Startup
public class PahoClient implements PahoClientLocal {
@EJB
VehicleStatusLocal vehicleS;
private MqttClient client;
SubscribeCallback callback;
private List<String> logStat = new ArrayList<String>();
@PostConstruct
public void start() throws MqttException {
try {
System.out.println("NM info: STARTING MQTT CLIENT");
client = new MqttClient("tcp://localhost", "NetworkManager", new MemoryPersistence());
//CLIENT CONNECTION OPTIONS
...
client.connect(mqttConnectOptions);
//MESSAGE LISTENER
callback = new SubscribeCallback(this, NM_STATUS_TOPIC, NM_ALARM_TOPIC);
client.setCallback(callback);
//TOPIC SUBSCRIBE
...
} catch (MqttException e) {
.....
}
public void sendMessage(String message) throws MqttPersistenceException, MqttException{
this.client.publish(DEVICE_STATUS_TOPIC, message.getBytes(), 2, false);
}
//
@Asynchronous
public void update(String matricola, String status, String latitude, String longitude, String battery){
vehicleS.updateVehicle(matricola, status, Double.parseDouble(latitude), Double.parseDouble(longitude), Integer.parseInt(battery));
}
我想将接收到的值更新到另一个 bean。但是在我的回调类中,我不能注入任何 EJB(它会中断 tcp 连接),所以我以这种方式获取 PahoClient 实例,如上所示:
callback = new SubscribeCallback(this, NM_STATUS_TOPIC, NM_ALARM_TOPIC)
因此,当我在回调方法上收到一条消息时,我会以这种方式更新值 public class SubscribeCallback 实现 MqttCallback{
public SubscribeCallback(PahoClient pahoClient, String topicNM, String topicALARM) {
this.client=pahoClient;
this.NM_STATUS_TOPIC = topicNM;
this.NM_ALARM_TOPIC = topicALARM;
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception{
..
pahoClient.update(msg);
它似乎工作正常。但在控制台中,我看到 MQTT 线程更新消息,而不是 EJB。而且我不知道如果有很多联系,这是否可行。有什么建议吗?
我正在考虑为我的 MQTT 客户端和我的 Java EE 应用程序开发一个 JCA 适配器,但我不知道这是否是正确的方法。