2013-03-28 77 views
1

我刚接触风暴,我在我的喷口中使用rabbitmq,它接收来自某个队列的元组,并且有一个客户端运行一个其他机器,将元组插入该队列中我运行了一个简单例如RabbitMQ的程序,工作正常,但是当我暴风雨中使用它喷出它武功受阻于在风暴拓扑结构中创建新连接时阻止Rabbitmq

connection = factory.newConnection(); 

即使我的RabbitMQ服务器也运行,当我运行示例代码在同一台机器上运行成功。 打印报表打印的声明

System.out.println(" setting host to 192.168.8.218..... "); 

下面是我的完整壶嘴类。

package storm.starter.spout; 

import backtype.storm.spout.SpoutOutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichSpout; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Values; 
import backtype.storm.utils.Utils; 
import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 
import java.util.Map; 
import java.util.Random; 
import java.net.*; 
import java.io.*; 
import java.lang.Exception; 
import java.io.IOException; 

public class RabbitmqSpout extends BaseRichSpout { 
    SpoutOutputCollector _collector; 
    public final static String QUEUE_NAME = "record"; 
    ConnectionFactory factory; 
    Connection connection; 
    Channel channel; 
    QueueingConsumer consumer; 

    @Override 
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) 
    { 
     _collector = collector; 
     System.out.println(" [*] Intilization of spout..... "); 

     try 
     { 
      factory = new ConnectionFactory(); 
      System.out.println(" creating connection factory..... "); 
      factory.setHost("192.168.8.96"); 
      System.out.println(" setting host to 192.168.8.218..... "); 
      connection = factory.newConnection(); 
      System.out.println(" creating new connection..... "); 
      channel = connection.createChannel(); 
      System.out.println(" creating new channel..... "); 
      channel.queueDeclare(QUEUE_NAME, false, false, false, null); 
      System.out.println(" Declaring queue..... "); 
      System.out.println(" [*] Waiting for messages. "); 
     } 
     catch(Exception exception) 
     { 
      System.out.println("Exception occurred. "+exception.getMessage()); 
     } 

    } 

    @Override 
    public void nextTuple() 
    { 
     System.out.println("In wait of tuples.... "); 
     try 
     { 
      consumer = new QueueingConsumer(channel); 
      System.out.println(" trying to consume..... "); 
      channel.basicConsume(QUEUE_NAME, true, consumer); 

      while (true) 
      { 
       System.out.println(" trying to deliver..... "); 
       QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
       String message = new String(delivery.getBody()); 
       System.out.println(" getting string..... "); 
       System.out.println(" [x] Received '" + message + "'"); 
       System.out.print("emitting Rabbitmq Queue tuple"); 
       _collector.emit(new Values(message)); 
       System.out.print("emitted Rabbitmq Queue tuple"); 
      } 
     } 

     catch(IOException io) 
     { 
      System.out.println("Exception occurred. "); 
     } 
     catch(Exception exception) 
     { 
      System.out.println("Exception occurred. "); 
     } 


    }   

    @Override 
    public void ack(Object id) { 
    } 

    @Override 
    public void fail(Object id) 
    { 

    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
    { 
     declarer.declare(new Fields("record")); 
    } 

}

+0

你可能想尝试的风暴AMQP的口,用于连接到RabbitMQ的https://github.com/Xorlev/storm-amqp-spout – Ilion 2013-04-18 08:00:42

回答