2

我开发了一个测试 Java EE 应用程序来通过 MQTT 协议(tcp 连接)发送和接收消息。在 JBoss 7.1 下,我得到了一个 @Singleton @Startup,它实例化了一个 Eclipse Paho 客户端和一个回调类。

@Singleton
@Startup
public class PahoClient implements PahoClientLocal {

@EJB
VehicleStatusLocal vehicleS;

private MqttClient client;
SubscribeCallback callback;
private List<String> logStat = new ArrayList<String>();



@PostConstruct
public void start() throws MqttException {
    try {
        System.out.println("NM info: STARTING MQTT CLIENT");
            client = new MqttClient("tcp://localhost", "NetworkManager", new MemoryPersistence());

        //CLIENT CONNECTION OPTIONS
        ...
        client.connect(mqttConnectOptions);

        //MESSAGE LISTENER
        callback = new SubscribeCallback(this, NM_STATUS_TOPIC, NM_ALARM_TOPIC);
        client.setCallback(callback);

        //TOPIC SUBSCRIBE
        ...
        } catch (MqttException e) {
        .....
}
public void sendMessage(String message) throws MqttPersistenceException, MqttException{
    this.client.publish(DEVICE_STATUS_TOPIC, message.getBytes(), 2, false);
} 
//
@Asynchronous
public void update(String matricola, String status, String latitude, String longitude, String  battery){
    vehicleS.updateVehicle(matricola, status, Double.parseDouble(latitude), Double.parseDouble(longitude), Integer.parseInt(battery));
}

我想将接收到的值更新到另一个 bean。但是在我的回调类中,我不能注入任何 EJB(它会中断 tcp 连接),所以我以这种方式获取 PahoClient 实例,如上所示:

callback = new SubscribeCallback(this, NM_STATUS_TOPIC, NM_ALARM_TOPIC)

因此,当我在回调方法上收到一条消息时,我会以这种方式更新值 public class SubscribeCallback 实现 MqttCallback{

public SubscribeCallback(PahoClient pahoClient, String topicNM, String topicALARM) {
    this.client=pahoClient;
    this.NM_STATUS_TOPIC = topicNM;
    this.NM_ALARM_TOPIC = topicALARM;
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception{
..
pahoClient.update(msg);

它似乎工作正常。但在控制台中,我看到 MQTT 线程更新消息,而不是 EJB。而且我不知道如果有很多联系,这是否可行。有什么建议吗?

我正在考虑为我的 MQTT 客户端和我的 Java EE 应用程序开发一个 JCA 适配器,但我不知道这是否是正确的方法。

4

0 回答 0