2015-12-02 137 views
2

我与Eclipse 1.2.2库拉打转转,WSO2 DAS 3.0.0和ActiveMQ的5.12.1做一些在物联网的世界里进行试验。到目前为止,我设法安装DAS作为M2M中间件服务器,库拉对树莓PI2作为物联网网关和ActiveMQ的作为MQTT服务器。WSO2 - DAS消费MQTT消息

我也写一个非常基本的MQTT消息生产者周期性地发送一个非常简单的MQTT消息给MQTT服务器,以模拟在实际设备发送MQTT消息。这个想法是用一个BlueTooth设备定期发送数据来取代这个应用程序。

当我使用MQTTSpy监视收到的消息,我已经注意到了MQTT消息格式的二进制。这在文档中清楚地陈述为Kura在使用MQTT发送数据时使用Google协议缓冲区。由于DAS不支持这种类型的MQTT消息,我认为这将导致服务器无法对任何进入的消息作出响应。

我配置一个DAS流使用了以下定义:

{ 
    "streamId": "mqtt_sample_01:1.0.0", 
    "name": "mqtt_sample_01", 
    "version": "1.0.0", 
    "nickName": "mqtt_sample_01", 
    "description": "mqtt_sample_01", 
    "metaData": [], 
    "correlationData": [], 
    "payloadData": [ 
    { 
     "name": "temperature", 
     "type": "FLOAT" 
    } 
    ] 
} 

我也使用以下代码创建的呼入MQTT消息的接收器:

<?xml version="1.0" encoding="UTF-8"?> 
<eventReceiver name="mqtt_sample_receiver_protobuf" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> 
    <from eventAdapterType="mqtt-protobuf"> 
     <property name="topic">mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata</property> 
     <property name="clientId">mqtt-client-01</property> 
     <property name="url">tcp://192.168.1.42:1883</property> 
     <property name="cleanSession">false</property> 
    </from> 
    <mapping customMapping="disable" type="map"/> 
    <to streamName="mqtt_sample_01" version="1.0.0"/> 
</eventReceiver> 

注:我还试图JSON和XML作为映射类型。

要显示在控制台DAS一切,我添加使用出版商​​:

<?xml version="1.0" encoding="UTF-8"?> 
<eventPublisher name="mqtt_sample_logger_01" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher"> 
    <from streamName="mqtt_sample_01" version="1.0.0"/> 
    <mapping customMapping="disable" type="text"/> 
    <to eventAdapterType="logger"> 
     <property name="uniqueId">mqtt_sample_logger_01</property> 
    </to> 
</eventPublisher> 

库拉格式使用未由WSO2-DAS理解谷歌协议缓冲区的MQTT消息。为了解决这个问题,有几个可能:

  1. 的MQTT消息格式可以在库拉改变不使用谷歌协议缓冲区进行编码。我发现了一个article on SO这或多或少类似this approach导致无论是在由CloudClient类提供的所有优点的损失。
  2. 可能性是编写您自己的DAS接收器,如this articlethis article中所述。
  3. 第三种方法是浏览Kura代码并创建CloudService/CloudClient实现的自己实现。

我个人认为,最好的解决方案将是第二种选择,编写一个自定义事件接收器来理解和解码由库拉生成的Google协议缓冲区格式。其他的,甚至更好的解决方案也是值得欢迎的。

重要提示:
ActiveMQ的使用在GUI(MQTT-寄件人topic.mqtt-客户01.MQTT_APP_V1.mydata)主题名称的点符号。但是该主题的真实名称使用/ -notation(mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata)。

,构建定制的接收器,我决定从原来的MQTT接收器复制现有的代码,并改变它来处理protobuf的格式,并将其转换成XML(至少是这样的想法)。在努力正确设置所有依赖项后,我设法构建了一个工作的自定义接收器。

不幸的是,我们并不完全是我想成为的地方。与MQTT代理的连接似乎存在问题。接收器启动但似乎经常在日志中写入以下消息而失去连接。

DEBUG {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} - MQTT client subscribed to : mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata 
INFO {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} - MQTT Connection successful 
WARN {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} - MQTT connection not reachable 
Connection lost (32109) - java.io.EOFException 
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:138) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.EOFException 
at java.io.DataInputStream.readByte(DataInputStream.java:267) 
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:56) 
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:100) 
... 1 more 

对于它的价值,经纪人(ActiveMQ的)抱怨了警告,指出:

WARN Stealing link for clientId mqtt-client-01 From Connection Transport Connection to: tcp://192.168.1.42:4594 

我的代码绝对必须做一些错误导致连接到被丢弃。问题是什么。所以,任何建议,想法,解决方案都是值得欢迎的!

提示
启动DAS与-DosgiConsole选项让你去调查你的部署捆绑的状态。接收机的成功部署之后,该命令DIAG [bundle_number]应输出类似:
的OSGi> DIAG 473
参考:文件:../的dropins/test.wso2.mqtt.receiver.MqttProtobufReceiver-> 1.0。 0.jar [473]
没有未解决的限制。

+0

DAS接收者和库拉发送者的客户端ID应该有不同的值。在涉及消息类型的接收者代码中仍然存在一个小问题,该消息类型应该被设置为XML而不是MAP。我将重建一个新版本,并将其提供给那些可能会觉得有用的人。 – KDW

回答

0

为WSO2产物(例如数据分析服务器)能够通过的Eclipse库拉(KuraPayload格式)创建处理谷歌协议缓冲器格式化的消息的一个输入接收器的一个例子可以是downloaded at Google Drive

发送消息的库拉示例应用程序也可以是downloaded at Google Drive

接收器接收二进制格式的KuraPayload格式并将其转换为XML。检查示例应用程序的XML格式。

请分享您在接收器上做的改进/修改以帮助他人。