2014-09-26 54 views
0

我读this,你能解释几件事吗? 例如,我已经运行rpc_server.py在终端的不同选项卡(3选项卡)。RabbitMQ中许多客户端和许多服务器

rpc_server.py从教程:

#!/usr/bin/env python 
import pika 

connection = pika.BlockingConnection(pika.ConnectionParameters(
     host='localhost')) 

channel = connection.channel() 

channel.queue_declare(queue='rpc_queue') 

def fib(n): 
    if n == 0: 
     return 0 
    elif n == 1: 
     return 1 
    else: 
     return fib(n-1) + fib(n-2) 

def on_request(ch, method, props, body): 
    n = int(body) 

    print " [.] fib(%s)" % (n,) 
    response = fib(n) 

    ch.basic_publish(exchange='', 
        routing_key=props.reply_to, 
        properties=pika.BasicProperties(correlation_id = \ 
                props.correlation_id), 
        body=str(response)) 
    ch.basic_ack(delivery_tag = method.delivery_tag) 

channel.basic_qos(prefetch_count=1) 
channel.basic_consume(on_request, queue='rpc_queue') 

print " [x] Awaiting RPC requests" 
channel.start_consuming() 

不错,我需要在send.py 3请求发送:

#!/usr/bin/env python 
import pika 
import uuid 

class FibonacciRpcClient(object): 
    def __init__(self): 
     self.connection = pika.BlockingConnection(pika.ConnectionParameters(
       host='localhost')) 

     self.channel = self.connection.channel() 

     result = self.channel.queue_declare(exclusive=True) 
     self.callback_queue = result.method.queue 

     self.channel.basic_consume(self.on_response, no_ack=True, 
            queue=self.callback_queue) 

    def on_response(self, ch, method, props, body): 
     if self.corr_id == props.correlation_id: 
      self.response = body 

    def call(self, n): 
     self.response = None 
     self.corr_id = str(uuid.uuid4()) 
     self.channel.basic_publish(exchange='', 
            routing_key='rpc_queue', 
            properties=pika.BasicProperties(
             reply_to = self.callback_queue, 
             correlation_id = self.corr_id, 
             ), 
            body=str(n)) 
     while self.response is None: 
      self.connection.process_data_events() 
     return int(self.response) 

fibonacci_rpc = FibonacciRpcClient() 

print " [x] Requesting fib(30)" 
response = fibonacci_rpc.call(30) 
print " [.] Got %r" % (response,) 

fibonacci_rpc1 = FibonacciRpcClient() 

print " [x] Requesting fib(30)" 
response1 = fibonacci_rpc1.call(30) 
print " [.] Got %r" % (response1,) 

fibonacci_rpc2 = FibonacciRpcClient() 

print " [x] Requesting fib(30)" 
response2 = fibonacci_rpc2.call(30) 
print " [.] Got %r" % (response2,) 

这是否意味着该脚本将等待响应从第一次请求,然后发送第二次请求,再次等待响应,然后发送第三次请求?

我想在一个时刻做3个请求,而不是等待响应,然后发送新的请求。这个怎么做?

我该如何改变send.py或使用其他技术?我必须使用线程或多处理? RabbitMQ是否支持这个?

谢谢!

+0

我真的不明白你在这里做什么。你似乎已经在同一个对象中发布和消费,这完全破坏了排队的目的。其实例为 – 2014-09-26 17:21:41

+0

。打电话(30),打电话(60),打电话(90) – tim 2014-09-26 17:46:24

+0

这种回应似乎是完全不相关的。排队系统的要点是,您有(一个或多个)应用程序将某些内容放入队列中,以及一组完全独立的客户端,这些客户端订阅队列并处理消息。在同一个班级和同一个过程中采取这两种行动完全没有意义。 – 2014-09-26 17:58:41

回答

0

如果您希望一次发送所有三个请求,则需要使用线程。一个简单的解决方案将是这个样子:

import threading 
from time import sleep 

def make_rpc_call(value): 
    fibonacci_rpc = FibonacciRpcClient() 
    print " [x] Requesting fib({0})".format(value) 
    response = fibonacci_rpc.call(value) 
    print " [.] Got %r" % (response,) 

for index in xrange(5): 
    thread_ = threading.Thread(target=make_rpc_call, args=(30,)) 
    thread_.start() 
    sleep(0.1) 

请记住,鼠兔是不是线程安全的,所以你需要创建每个线程一个连接。作为替代,你可以看看我的Flask示例here Pika。很容易修改以允许您异步执行多个请求。

def rpc_call(): 
    # Fire of all three requests. 
    corr_id1 = rpc_client.send_request('1') 
    corr_id2 = rpc_client.send_request('2') 
    corr_id3 = rpc_client.send_request('3') 

    # Wait for the response on all three requests. 
    while not rpc_client.queue[corr_id1] \ 
      or not rpc_client.queue[corr_id2] \ 
      or not rpc_client.queue[corr_id3]: 
     sleep(0.1) 

    # Print the result of all three requests. 
    print rpc_client.queue[corr_id1] 
    print rpc_client.queue[corr_id2] 
    print rpc_client.queue[corr_id3] 

if __name__ == '__main__': 
    rpc_client = RpcClient('rpc_queue') 
    sleep(1) 
    print rpc_call()