2017-05-04 331 views
0

我是一个试图整合qpidApache Camel的新手。我需要编写java代码来使用qpid从队列读取和写入。javax.jms.JMSException:现有连接被远程主机强制关闭

所以首先我从qpid网站下载了JMS例子。我试图运行的代码是。

/* 
* 
* Licensed to the Apache Software Foundation (ASF) under one 
* or more contributor license agreements. See the NOTICE file 
* distributed with this work for additional information 
* regarding copyright ownership. The ASF licenses this file 
* to you under the Apache License, Version 2.0 (the 
* "License"); you may not use this file except in compliance 
* with the License. You may obtain a copy of the License at 
* 
* http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, 
* software distributed under the License is distributed on an 
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
* KIND, either express or implied. See the License for the 
* specific language governing permissions and limitations 
* under the License. 
* 
*/ 
package org.apache.qpid.jms.example; 

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.DeliveryMode; 
import javax.jms.Destination; 
import javax.jms.ExceptionListener; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import javax.naming.Context; 
import javax.naming.InitialContext; 

public class HelloWorld { 
    public static void main(String[] args) throws Exception { 
     try { 
      // The configuration for the Qpid InitialContextFactory has been supplied in 
      // a jndi.properties file in the classpath, which results in it being picked 
      // up automatically by the InitialContext constructor. 
      Context context = new InitialContext(); 

      ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup"); 
      Destination queue = (Destination) context.lookup("myQueueLookup"); 

      Connection connection = factory.createConnection(System.getProperty("USER"), System.getProperty("PASSWORD")); 
      connection.setExceptionListener(new MyExceptionListener()); 
      connection.start(); 

      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

      MessageProducer messageProducer = session.createProducer(queue); 
      MessageConsumer messageConsumer = session.createConsumer(queue); 

      TextMessage message = session.createTextMessage("Hello world!"); 
      messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); 
      TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L); 

      if (receivedMessage != null) { 
       System.out.println(receivedMessage.getText()); 
      } else { 
       System.out.println("No message received within the given timeout!"); 
      } 

      connection.close(); 
     } catch (Exception exp) { 
      System.out.println("Caught exception, exiting."); 
      exp.printStackTrace(System.out); 
      System.exit(1); 
     } 
    } 

    private static class MyExceptionListener implements ExceptionListener { 
     @Override 
     public void onException(JMSException exception) { 
      System.out.println("Connection ExceptionListener fired, exiting."); 
      exception.printStackTrace(System.out); 
      System.exit(1); 
     } 
    } 
} 

这是依赖于文件::

# Set the InitialContextFactory class to use 
java.naming.factory.initial = org.apache.qpid.jms.jndi.JmsInitialContextFactory 
# Define the required ConnectionFactory instances 
# connectionfactory.<JNDI-lookup-name> = <URI> 
connectionfactory.myFactoryLookup = amqp://localhost:5672 

# Configure the necessary Queue and Topic objects 
# queue.<JNDI-lookup-name> = <queue-name> 
# topic.<JNDI-lookup-name> = <topic-name> 
queue.myQueueLookup = queue 
topic.myTopicLookup = topic 

现在我明白了序这个工作,我需要的东西被称为Broker服务。做一些研究,我发现我可以使用RabbitMQ来达到这个目的。所以,我下载我的Windows机器上,我试图连接到它的端口localhost:5672

但是当我运行我的代码,我得到的错误::

2017-05-04 11:28:29,329 [main   ] - ERROR JmsConnection     - Failed to connect to remote at: amqp://localhost:5672 
Caught exception, exiting. 
javax.jms.JMSException: An existing connection was forcibly closed by the remote host 
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:86) 
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:108) 
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:172) 
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:204) 
    at org.apache.qpid.jms.example.HelloWorld.main(HelloWorld.java:48) 
Caused by: java.io.IOException: An existing connection was forcibly closed by the remote host 
    at sun.nio.ch.SocketDispatcher.read0(Native Method) 
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) 
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
    at sun.nio.ch.IOUtil.read(IOUtil.java:192) 
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) 
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) 
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100) 
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:624) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:559) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:476) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) 
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) 
    at java.lang.Thread.run(Thread.java:745) 

为什么这个错误发生?而这个端口肯定在我的本地机器上监听。再次我是JMS的新手,所以任何指导将不胜感激:)

+0

服务器正在运行吗? – Ishnark

+0

是的。我做了一个'telnet 127.0.0.1 5672',它工作。 –

+0

不确定这是你的qpid版本的情况,但似乎有一个[qpid 0.6版本中的错误,阻止正确协商协议版本](https://www.rabbitmq.com/interoperability.html),这里是声明:Qpid java客户端的0.6版本发布了一个错误,导致它无法正确协商协议版本。由于它默认为AMQP 0-10,它无法连接到任何0-8或0-9-1代理(包括RabbitMQ).' – Olivier

回答

2

正如蒂姆提到的那样,您至少也需要确保经纪人实验性AMQP 1.0插件已被加载,而您并未提及此操作。

但是,在这种情况下,它可能没有太大的区别。我没有使用JMS客户端或对RabbitMQ的其他一些AMQP 1.0客户端之前很成功,由于我报告的问题建立消费者和生产者时阻止他们在其轨道:https://github.com/rabbitmq/rabbitmq-amqp1.0/issues/34

你提到的研究的东西在决定使用RabbitMQ,这听起来像你没有绑定到现有的服务器解决方案?如果是这样,对于支持AMQP 1.0和JMS客户端的其他服务器经常使用,您可以尝试使用ActiveMQ,ActiveMQ Artemis,Qpid for Java代理,Qpid C++代理或Qpid Dispatch路由器(不适用于Windows,您提及使用)等等。

1

它看起来像你可能会使用Qpid JMS AMQP v1.0 client这将无法连接到RabbitMQ,除非你使用RabbitMQ的实验性AMQP 1.0插件。

确保您使用的是与您正在运行的代理兼容的客户端。

相关问题