2015-11-05 227 views
2

我有一个使用单个Azure服务总线消息队列的Java应用程序和NodeJS应用程序。互操作性Azure服务总线消息队列消息

我跟我的客户见证了一些奇怪的效果,如下。

JAVA消息生产者(使用每天青JMS教程QPID库):

TextMessage message = sendSession.createTextMessage(); 
     message.setText("Test AMQP message from JMS"); 
     long randomMessageID = randomGenerator.nextLong() >>>1; 
     message.setJMSMessageID("ID:" + randomMessageID); 
     sender.send(message); 
     System.out.println("Sent message with JMSMessageID = " + message.getJMSMessageID()); 

OUTPUT:2414932965987073843

的NodeJS消息消费者:

serviceBus.receiveQueueMessage(queue, {timeoutIntervalInS: timeOut, isReceiveAndDelete: true}, function(err, message) { 
if(message !==null)console.log(util.inspect(message, {showHidden: false, depth: null})); 
}); 

与JMSMessageID按= ID发送消息OUTPUT:

{ body: '@\u0006string\b3http://schemas.microsoft.com/2003/10/Serialization/�\u001aTest AMQP message from JMS', 
brokerProperties: 
{ DeliveryCount: 1, 
EnqueuedSequenceNumber: 5000004, 
EnqueuedTimeUtc: 'Wed, 04 Nov 2015 21:28:21 GMT', 
MessageId: '2414932965987073843', 
PartitionKey: '89', 
SequenceNumber: 59672695067659070, 
State: 'Active', 
TimeToLive: 1209600, 
To: 'moequeue' }, 
contentType: 'application/xml; charset=utf-8' } 

如果我比较,为)通过serviceBus.sendQueueMessage(插入到队列中的消息,则性质是这样的:

{ body: 'test message', 
brokerProperties: 
{ DeliveryCount: 1, 
EnqueuedSequenceNumber: 0, 
EnqueuedTimeUtc: 'Wed, 04 Nov 2015 21:44:03 GMT', 
MessageId: 'bc0a3d4f-15ba-434f-9fb0-1a3789885f8c', 
PartitionKey: '734', 
SequenceNumber: 37436171906517256, 
State: 'Active', 
TimeToLive: 1209600 }, 
contentType: 'text/plain', 
customProperties: 
{ message_number: 0, 
sent_date: Wed Nov 04 2015 21:44:03 GMT+0000 (UTC) } } 

所以内容类型是不同的下手 - 为什么? - 然后第一个消息有效载荷的主体中的奇怪垃圾来自哪里:@ \ u0006string \ b3http://schemas.microsoft.com/2003/10/Serialization/ \ u001a 这是序列化?这怎么能被缓解?

查找代码,以及在这里:

回答

2

Azure的服务总线支持两种不同的协议:AMQP和HTTP。使用qpid库的Java/JMS对ServiceBus使用AMQP协议。但是,NodeJS中包装的ServiceBus REST API通过HTTP协议。

服务总线支持AMQP的详细信息请参考https://azure.microsoft.com/en-us/documentation/articles/service-bus-amqp-overview/

而对于ServiceBus的REST API,请参阅https://msdn.microsoft.com/en-us/library/azure/hh780717.aspx

AMQP是二进制,应用层协议,设计用于高效 支持各种各样的消息传送应用程序和通信 图案。 - from WikiPedia

但HTTP是一种文本协议。

消息格式如下,请参考节点http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-formatMessage Format节。 AMQP规范可以参考http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html

             Bare Message 
                  | 
             .---------------------+--------------------. 
             |           | 
+--------+-------------+-------------+------------+--------------+--------------+--------+ 
| header | delivery- | message- | properties | application- | application- | footer | 
|  | annotations | annotations |   | properties | data   |  | 
+--------+-------------+-------------+------------+--------------+--------------+--------+ 
|                      | 
'-------------------------------------------+--------------------------------------------' 
              | 
             Annotated Message 

所以在Java中发送或在NodeJS中发送的消息被序列化为不同的结果。

内容\uXXXX在AMQP中形成的内容是Unicode Charater。

Unicode字符集\u0006是确认控制字符,请参考https://en.wikipedia.org/wiki/Acknowledge_character了解它。

而且Unicode字库\u001a是替代控制字符,请参考https://en.wikipedia.org/wiki/Substitute_character

它们限制了消息头中元数据的开始和结束。

+0

了解,那么有没有一种方法可以在Node中“安全”反序列化这些消息?我坚持所有客户端的相同协议是一个好主意,但现实可能会有所不同,有效负载仍然需要可交换。 – MoB

+0

@ user3506080在发送者和接收者中使用不同的协议并不是一个好主意。但我认为,如果真的有必要,您可以通过特殊字符分割内容来安全地获取正确的信息。 –

+0

虽然这些信息在技术上是正确的,但它显然是错误的。 AMQP是一种二进制传输协议,根本不会对实际的消息进行任何编码。消息的编码是JMS。如果将消息的类型更改为字节消息(JmsByteMessage),则编码将被REST服务解释为“application/octetstream”,而不是“application/xml”,并且您的消息将在节点中可读。 –

0

我们遇到了完全相同的问题,尽管在使用基于Camel的生产者的一个更多的例子中。由于我们环境的变化,我们开始遇到这些问题。

这里的问题是REST服务在编码对节点客户端的HTTP响应时如何解释JMS消息。

我们发现JmsTextmessage出于某种原因,并不完全清楚,被认为是“application/xml”类型,并且内容将按照这种方式转发。因此你在你的例子中得到了OUTPUT。

如果改为使用JmsByteMessage,则内容将被解释为“application/octet-stream”,并且不会在传输中发生损坏。

所以尽量沿着线的东西:

BytesMessage message = sendSession.createBytesMessage(); 
String body = "Test AMQP message from JMS"; 
message.writeBytes(body.getBytes(StandardCharsets.UTF_8)); 
sender.send(message); 

我们为了转移到由Node.js的客户端解译的JSON编码的数据使用。