3
我有,每一个新的消息被消耗时旋转了一个新的线程RabbitMQ的队列用户:处置线程的Ruby或JRuby
AMQP.start(@conf) do |connection|
channel = AMQP::Channel.new(connection)
requests_queue = channel.queue("one")
requests_queue.subscribe(:ack => true) do |header, body|
puts "we have a message at #{Time.now} and n is #{n}"
url_search = MultiJson.decode(body)
Thread.new do
5.times do
lead = get_lead(n, (n == 5))
puts "message #{n} is_last = #{lead.is_last} at #{Time.now}";
AMQP::Exchange.default.publish(
MultiJson.encode(lead),
:routing_key => header.reply_to,
:correlation_id => header.correlation_id
)
n += 1
sleep(2)
end
end
end
end
我的问题是,我该如何后处理的线程该消息是处理?我应该使用线程池吗?
我正在使用JRuby。上述代码是否使用普通的ruby语法在幕后创建了Java JVM线程,还是应该显式创建Java线程?