我无法设法编写一个服务器,通过WebSocket监听STOMP消息。我的问题在于stomp协议和JMS消费者的创建。ActiveMQ,WebSocket和Stomp
下面的代码上的createConnection
class StompDemo {
val uri = "ws://localhost:61614"
val topicName = "mytopic"
val broker = new BrokerService
broker.addConnector(uri)
val topic = new ActiveMQTopic(topicName)
val topics = Array[ActiveMQDestination](topic)
broker.setDestinations(topics)
broker.start
println("Started broker")
val connectionFactory = new ActiveMQConnectionFactory(uri)
val connection = connectionFactory.createConnection
println("Started connection")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val destination = session.createTopic(topicName)
val consumer = session.createConsumer(destination)
println("Created consumer")
while(true) {
println("Waiting for next message")
val message = consumer.receive
}
}
失败,出现以下异常:
Could not create Transport. Reason: java.io.IOException: createTransport() method not implemented!
能否请你指出问题与此代码? 如何使用AMQ通过WebSocket/Stomp以编程方式将JMS侦听器配置为队列或主题?
感谢
新更新的代码和ActiveMQ传输失败:TCP:///127.0.0.1:51309 @ 6969】传输连接:TCP://127.0.0.1:51309失败:java.io. IOException:未知数据类型:47 我想它必须与基于二进制和基于文本的相关。
仍在调查为什么会失败:
package org.tj.amq
import org.apache.activemq.broker.BrokerService
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
//
// http://www.massapi.com/class/br/BrokerService.html
//
object AMQStompDemo extends MainLoop with Logging {
<<("AMQ Stomp Demo")
val uri = "tcp://localhost:6969"
val broker = new BrokerService
broker.setPersistent(false)
broker.setUseJmx(false)
broker.addConnector(uri)
broker.start
<<("Started broker")
val connectionFactory = new ActiveMQConnectionFactory(uri)
val connection = connectionFactory.createConnection
connection.start
<<("Started connection")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val destination = session.createQueue("test")
val consumer = session.createConsumer(destination)
while(true) {
<<("Ready to receive next message ...")
val message = consumer.receive
message match {
case tm:TextMessage => <<(s"Received text message ${tm.getText}")
case _ => <<(s"Received another message type $message")
}
}
def main(args: Array[String]): Unit = {}
}
trait Logging {
def <<(any : => Any) = println(s"${Thread.currentThread().getName} $any")
}
trait MainLoop extends Logging {
new Thread(new Runnable() {
override def run = {
<<("Starting main loop")
while(true) {
Thread.sleep(1000)
}
}
}).start
}
的传奇仍在继续。 只要加上broker.addConnector("ws://localhost:6971")
我可以成功地通过WS从浏览器连接到队列/队列/测试
现在,最后剩下的问题 - 我得到回调,但AMQ给了我这个
[WARN] 07 Feb 04:54:26 PM qtp1458849419-25 [] Transport Connection to: StompSocket_984548082 failed: java.io.IOException
Exception in thread "ActiveMQ InactivityMonitor Worker" java.lang.NullPointerException
at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314)
at org.apache.activemq.transport.AbstractInactivityMonitor$4.run(AbstractInactivityMonitor.java:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
收到第一条消息之后。
将帖子 嗯,我一直在遭受https://issues.apache.org/jira/browse/AMQ-5155 因此,使用AMQ版本5.9.0工程。
我的感觉是AMQ for WebSockets太脆弱了。那么可能用Tomcat代替更保守的方法。
谢谢。所以你要说的是有效地使用tcp://而不是ws://这有意义,并且只需使用web套接字lib通过正常的脚本连接,然后对它进行排序。正确? – jts 2015-02-06 11:30:42
是的,您需要使用tcp://或ssl://与Java客户端。如果你需要从浏览器连接,然后使用基于stomp的库,可以做websockets,确保代理有一个ws或wss传输来做到这一点。 – 2015-02-06 18:45:10
你好。我做了你的建议:基本上启动了tcp:// localhost:6969的代理和连接,并使用了一个来自apache-activemq-5.11.0 \ webapps-demo \ demo \ websocket的web客户端 - 并且当我连接到ws://localhost:6969发送消息到/ queue/test(我的代码听测试 - 浏览器告诉我哎呀!丢失连接到ws:// localhost:6969 /和服务器端tcp:///127.0.0.1: 51114 @ 6969 []传输连接到:tcp://127.0.0.1:51114失败:java.io.IOException:未知数据类型:47 ..任何建议? – jts 2015-02-07 08:37:32