2

我正在使用 Eclipse Kura 1.2.2、WSO2 DAS 3.0.0 和 ActiveMQ 5.12.1 在物联网世界中进行一些实验。到目前为止,我设法将 DAS 设置为 M2M 中间件服务器,将 Raspberry PI2 上的 Kura 作为 IoT 网关,并将 ActiveMQ 作为 MQTT 服务器。

我还编写了一个非常基本的 MQTT 消息生产者,它定期向 MQTT 服务器发送一条非常简单的 MQTT 消息,以模拟实际设备发送 MQTT 消息。这个想法是用定期发送数据的蓝牙设备替换这个应用程序。

当我使用 MQTTSpy 监控传入消息时,我注意到 MQTT 消息是二进制格式的。文档中明确说明了这一点,因为 Kura 在使用 MQTT 发送数据时使用了 Google 协议缓冲区。由于 DAS 不支持这种类型的 MQTT 消息,我假设这会导致服务器不响应任何传入消息。

我使用以下定义配置了 DAS 流:

{
  "streamId": "mqtt_sample_01:1.0.0",
  "name": "mqtt_sample_01",
  "version": "1.0.0",
  "nickName": "mqtt_sample_01",
  "description": "mqtt_sample_01",
  "metaData": [],
  "correlationData": [],
  "payloadData": [
    {
      "name": "temperature",
      "type": "FLOAT"
    }
  ]
}

我还使用以下代码为传入的 MQTT 消息创建了一个接收器:

<?xml version="1.0" encoding="UTF-8"?>
<eventReceiver name="mqtt_sample_receiver_protobuf" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
    <from eventAdapterType="mqtt-protobuf">
        <property name="topic">mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata</property>
        <property name="clientId">mqtt-client-01</property>
        <property name="url">tcp://192.168.1.42:1883</property>
        <property name="cleanSession">false</property>
    </from>
    <mapping customMapping="disable" type="map"/>
    <to streamName="mqtt_sample_01" version="1.0.0"/>
</eventReceiver>

注意:我也尝试过 JSON 和 XML 作为映射类型。

为了在 DAS 控制台上显示所有内容,我添加了一个发布者,使用:

<?xml version="1.0" encoding="UTF-8"?>
<eventPublisher name="mqtt_sample_logger_01" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher">
    <from streamName="mqtt_sample_01" version="1.0.0"/>
    <mapping customMapping="disable" type="text"/>
    <to eventAdapterType="logger">
        <property name="uniqueId">mqtt_sample_logger_01</property>
    </to>
</eventPublisher>  

Kura 使用 WSO2-DAS 无法理解的 Google 协议缓冲区格式化 MQTT 消息。为了解决这个问题,存在几种可能性:

  1. MQTT 消息格式可以在 Kura 更改为不使用 Google 协议缓冲区进行编码。我发现一篇关于 SO的文章或多或少类似于这种方法,导致两者都失去了 CloudClient 类提供的所有优势。
  2. 一种可能性是编写您自己的 DAS 接收器,如本文本文所述
  3. 第三种选择是浏览 Kura 代码并创建自己的 CloudService/CloudClient 实现的实现。

就我个人而言,最好的解决方案是第二种选择,编写一个自定义事件接收器,它可以理解和解码 Kura 生成的 Google 协议缓冲区格式。其他甚至更好的解决方案也非常受欢迎。

重要提示:
ActiveMQ 在 GUI 中使用点符号表示主题名称(mqtt-sender-topic.mqtt-client-01.MQTT_APP_V1.mydata)。但是主题的真实名称使用/-notation(mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata)。

为了构建自定义接收器,我决定从原始 MQTT 接收器复制现有代码并对其进行更改以处理 protobuf 格式并将其转换为 XML(至少是这样的想法)。经过一番努力正确设置所有依赖项后,我设法构建了一个有效的自定义接收器。

不幸的是,我们并不完全是我想去的地方。与 MQTT 代理的连接似乎存在问题。接收器启动但似乎经常断开连接,并在日志中写入以下消息。

DEBUG {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} -  MQTT client subscribed to : mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata
INFO {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} -  MQTT Connection successful
WARN {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} -  MQTT connection not reachable
Connection lost (32109) - java.io.EOFException
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:138)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:56)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:100)
... 1 more

对于它的价值,代理(ActiveMQ)抱怨警告说:

WARN Stealing link for clientId mqtt-client-01 From Connection Transport Connection to: tcp://192.168.1.42:4594

我的代码肯定做错了什么导致连接断开。问题是什么。因此,我们再次欢迎任何建议、想法和解决方案!

提示
使用 -DosgiConsole 选项启动 DAS,允许您调查已部署捆绑包的状态。成功部署接收器后,命令diag [bundle_number]应输出如下内容:
osgi> diag 473
reference:file:../dropins/test.wso2.mqtt.receiver.MqttProtobufReceiver->1.0.0.jar [473]
没有未解决的约束。

4

1 回答 1

0

WSO2 产品(例如数据分析服务器)的输入接收器示例能够处理由 Eclipse Kura(KuraPayload 格式)创建的 Google 协议缓冲区格式的消息,可以在 Google Drive 下载

发送消息的 Kura 示例应用程序也可以在 Google Drive 下载

接收方接收二进制格式的 KuraPayload 格式并将其转换为 XML。检查 XML 格式的示例应用程序。

请分享您对接收器所做的改进/修改以帮助他人。

于 2015-12-09T09:57:39.330 回答