2012-01-06 55 views
0

我正在尝试设置RabbitMQ rpc。我想要一个队列来监听,并且当它收到一条消息时,我希望它回复一个匿名队列,该队列通过具有多条消息的reply_to头部指定。EventMachine EM ::迭代器正在被rabbitmq RPC阻塞

我有以下托尔任务创建一个队列,然后使用EM:迭代发送数量的消息返回到与replyt_to路由密钥中指定的队列:

desc "start_consumer", "start the test consumer" 
def start_consumer 
    conf = { 
    :host => "localhost", 
    :user => "guest", 
    :password => "guest", 
    :vhost => "/", 
    :logging => true, 
    :port => 5672 
    } 

    # n = 1 

    AMQP.start(conf) do |connection| 

    channel = AMQP::Channel.new(connection) 

    requests_queue = channel.queue("one") 
    requests_queue.purge 

    Signal.trap("INT") do 
     connection.close do 
     EM.stop{exit} 
     end 
    end 

    channel.prefetch(1) 

    requests_queue.subscribe(:ack => true) do |header, body| 
     url_search = MultiJson.decode(body) 

     EM::Iterator.new(0..5).each do |n, iter| 
     lead = get_lead(n, (n == 5)) 

     puts "about to publish #{n} message is_last = #{lead.is_last} at #{Time.now}" 

     AMQP::Exchange.default.publish(
             MultiJson.encode(lead), 
             :immediate => true, 
             :routing_key => header.reply_to, 
             :correlation_id => header.correlation_id 
            ) 

     iter.next 
     end 
    end 

    puts " [x] Awaiting RPC requests" 
    end   
end 

代码beloow将消息发送到上面指定的队列,并创建一个队列,用于侦听由EM :: Iterator代码发送的消息。此队列的名称是第一个队列reply_to标头的路由键。

def publish(urlSearch, routing_key) 
    EM.run do 
    corr_id = rand(10_000_000).to_s 

    requests ||= Hash.new 

    connection = AMQP.connect(:host => "localhost") 

    callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => false) 

    callback_queue.subscribe do |header, body| 
     lead = safe_json_decode(body) 

     puts "company = #{lead["company"]} is_last = #{lead["is_last"]} received at #{Time.now}"    

     if lead["is_last"] 
     puts "in exit" 
     connection.close do 
      EM.stop{exit} 
     end 
     end 
    end 

    callback_queue.append_callback(:declare) do 
     AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id) 
    end 

    puts "initial message sent" 
    end 
end 

上面的代码工作正如我想要的一个恼人的例外。有些东西阻止了EM :: Iterator代码被异步执行。只有在EM :: Iterator代码完成后才会发送消息。我希望消息被异步发送,并在每次迭代后由匿名队列处理。目前,只有在EM :: Iterator代码完成其最后一次迭代后才会发送所有消息。

任何人都可以看到我做错了什么或建议一种不同的方法吗?我试过EM :: defer并且具有相同的行为。

回答

0

纺纱一个新线程的回答我的问题:

Thread.new do 
    5.times do 
    lead = get_lead(n, (n == 5)) 

    puts "message #{n} is_last = #{lead.is_last} at #{Time.now}"; 

    AMQP::Exchange.default.publish(
            MultiJson.encode(lead), 
            :routing_key => header.reply_to, 
            :correlation_id => header.correlation_id 
           ) 

    n += 1 
    sleep(2) 
    end 
end 

创建一个新的线程停止EventMachine的反应器被阻塞和消息发送异步。

+0

EM.defer可能更合适,因为它不需要产生线程,启动数据库连接等 – bbozo 2013-12-06 09:17:05