2015-02-06 357 views
0

我无法设法编写一个服务器,通过WebSocket监听STOMP消息。我的问题在于stomp协议和JMS消费者的创建。ActiveMQ,WebSocket和Stomp

下面的代码上的createConnection

class StompDemo { 
    val uri = "ws://localhost:61614" 
    val topicName = "mytopic" 
    val broker = new BrokerService 
    broker.addConnector(uri) 
    val topic = new ActiveMQTopic(topicName) 
    val topics = Array[ActiveMQDestination](topic) 
    broker.setDestinations(topics) 
    broker.start 
    println("Started broker") 

    val connectionFactory = new ActiveMQConnectionFactory(uri) 
    val connection = connectionFactory.createConnection 
    println("Started connection") 

    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) 
    val destination = session.createTopic(topicName) 
    val consumer = session.createConsumer(destination) 
    println("Created consumer") 

    while(true) { 
    println("Waiting for next message") 
    val message = consumer.receive 
    } 
} 

失败,出现以下异常:

Could not create Transport. Reason: java.io.IOException: createTransport() method not implemented! 

能否请你指出问题与此代码? 如何使用AMQ通过WebSocket/Stomp以编程方式将JMS侦听器配置为队列或主题?

感谢

新更新的代码和ActiveMQ传输失败:TCP:///127.0.0.1:51309 @ 6969】传输连接:TCP://127.0.0.1:51309失败:java.io. IOException:未知数据类型:47 我想它必须与基于二进制和基于文本的相关。

仍在调查为什么会失败:

package org.tj.amq 

import org.apache.activemq.broker.BrokerService 
import org.apache.activemq.ActiveMQConnectionFactory 
import javax.jms.Session 
import javax.jms.TextMessage 

// 
// http://www.massapi.com/class/br/BrokerService.html 
// 

object AMQStompDemo extends MainLoop with Logging { 
    <<("AMQ Stomp Demo") 
    val uri = "tcp://localhost:6969" 
    val broker = new BrokerService 
    broker.setPersistent(false) 
    broker.setUseJmx(false) 
    broker.addConnector(uri) 
    broker.start 
    <<("Started broker") 

    val connectionFactory = new ActiveMQConnectionFactory(uri) 
    val connection = connectionFactory.createConnection 
    connection.start 
    <<("Started connection") 
    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) 
    val destination = session.createQueue("test") 
    val consumer = session.createConsumer(destination) 

    while(true) { 
    <<("Ready to receive next message ...") 
    val message = consumer.receive 
    message match { 
     case tm:TextMessage => <<(s"Received text message ${tm.getText}") 
     case _ => <<(s"Received another message type $message") 
    } 
    } 

    def main(args: Array[String]): Unit = {} 
} 

trait Logging { 
    def <<(any : => Any) = println(s"${Thread.currentThread().getName} $any") 
} 

trait MainLoop extends Logging { 
    new Thread(new Runnable() { 
    override def run = { 
     <<("Starting main loop") 
     while(true) { 
     Thread.sleep(1000) 
     } 
    } 
    }).start 
} 

的传奇仍在继续。 只要加上broker.addConnector("ws://localhost:6971") 我可以成功地通过WS从浏览器连接到队列/队列/测试

现在,最后剩下的问题 - 我得到回调,但AMQ给了我这个

[WARN] 07 Feb 04:54:26 PM qtp1458849419-25 [] Transport Connection to: StompSocket_984548082 failed: java.io.IOException 
Exception in thread "ActiveMQ InactivityMonitor Worker" java.lang.NullPointerException 
    at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314) 
    at org.apache.activemq.transport.AbstractInactivityMonitor$4.run(AbstractInactivityMonitor.java:215) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

收到第一条消息之后。

将帖子 嗯,我一直在遭受https://issues.apache.org/jira/browse/AMQ-5155 因此,使用AMQ版本5.9.0工程

我的感觉是AMQ for WebSockets太脆弱了。那么可能用Tomcat代替更保守的方法。

回答

1

通常,您不会在服务器端使用网络套接字,只需使用正常的STOMP或OpenWire连接进行连接即可。

那个siad,看着你的代码,你似乎在使用ActiveMQ JMS客户端,它既不会说STOMP也不会说Websockets,所以你注定要失败。 ActiveMQ JMS客户端使用OpenWire协议,可以通过TCP或SSL连接(HTTP可以与正确的jar一起工作)。

+0

谢谢。所以你要说的是有效地使用tcp://而不是ws://这有意义,并且只需使用web套接字lib通过正常的脚本连接,然后对它进行排序。正确? – jts 2015-02-06 11:30:42

+0

是的,您需要使用tcp://或ssl://与Java客户端。如果你需要从浏览器连接,然后使用基于stomp的库,可以做websockets,确保代理有一个ws或wss传输来做到这一点。 – 2015-02-06 18:45:10

+0

你好。我做了你的建议:基本上启动了tcp:// localhost:6969的代理和连接,并使用了一个来自apache-activemq-5.11.0 \ webapps-demo \ demo \ websocket的web客户端 - 并且当我连接到ws://localhost:6969发送消息到/ queue/test(我的代码听测试 - 浏览器告诉我哎呀!丢失连接到ws:// localhost:6969 /和服务器端tcp:///127.0.0.1: 51114 @ 6969 []传输连接到:tcp://127.0.0.1:51114失败:java.io.IOException:未知数据类型:47 ..任何建议? – jts 2015-02-07 08:37:32