2012-01-10 36 views
3

我有,每一个新的消息被消耗时旋转了一个新的线程RabbitMQ的队列用户:处置线程的Ruby或JRuby

AMQP.start(@conf) do |connection| 
    channel = AMQP::Channel.new(connection) 

    requests_queue = channel.queue("one") 

    requests_queue.subscribe(:ack => true) do |header, body| 
    puts "we have a message at #{Time.now} and n is #{n}" 

    url_search = MultiJson.decode(body) 

    Thread.new do 
     5.times do 
     lead = get_lead(n, (n == 5)) 

     puts "message #{n} is_last = #{lead.is_last} at #{Time.now}"; 

     AMQP::Exchange.default.publish(
             MultiJson.encode(lead), 
             :routing_key => header.reply_to, 
             :correlation_id => header.correlation_id 
            ) 

     n += 1 
     sleep(2) 
     end 
    end 
    end 
end 

我的问题是,我该如何后处理的线程该消息是处理?我应该使用线程池吗?

我正在使用JRuby。上述代码是否使用普通的ruby语法在幕后创建了Java JVM线程,还是应该显式创建Java线程?

回答

1

你不需要手动配置我认为的线程,你应该使用ruby线程,从我收集的他们是jruby中的java线程,这是jruby得到它的好表现。

一个常见的事情是旋转几个线程,然后在继续之前加入所有线程,如果你想确保在下一步之前全部完成,但在这里似乎并不需要。

这里有一个小测试程序:

# foo.rb 
a = Thread.new { print "a"; sleep(1); print "b"; print "c" } 
require 'pp' 
pp Thread.list 
puts "foo" 
sleep(2); 
pp Thread.list 
puts "bar" 

正如你可以看到催生后台线程被自动删除。 (测试jruby以及1.9.2

$ ruby foo.rb 
a[#<Thread:0x00000100887678 run>, #<Thread:0x0000010086c7d8 sleep>] 
foo 
bc[#<Thread:0x00000100887678 run>] 
bar