2015-10-04 508 views
2

我有这段代码,基本上它运行的是 channel.start_consuming()。 我希望它在一段时间后停止。Pika channel.stop_consuming不停止start_consuming循环

我认为 channel.stop_consuming()是正确的方法:

def stop_consuming(self, consumer_tag=None): 
    """ Cancels all consumers, signalling the `start_consuming` loop to 
    exit. 

但它不工作:start_consuming()永远不会结束(从执行不退出这个调用,“结束”从不打印)。

进口单元测试 进口鼠 进口线程 进口时间

_url = "amqp://user:[email protected]/aaa" 

class Consumer_test(unittest.TestCase): 

    def test_startConsuming(self): 

     def callback(channel, method, properties, body): 
      print("callback") 
      print(body) 

     def connectionTimeoutCallback(): 
      print("connecionClosedCallback") 

     def _closeChannel(channel_): 
      print("_closeChannel") 
      time.sleep(1) 
      print("close") 
      if channel_.is_open: 
       channel_.stop_consuming() 
       print("stop_cosuming") 
      else: 
       print("channel is closed") 
      #channel_.close() 

     params = pika.URLParameters(_url) 
     params.socket_timeout = 5 
     connection = pika.BlockingConnection(params) 
     #connection.add_timeout(2, connectionTimeoutCallback) 
     channel = connection.channel() 
     channel.basic_consume(callback, 
           queue='test', 
           no_ack=True) 

     t = threading.Thread(target=_closeChannel, args=[channel]) 
     t.start() 

     print("start_consuming") 
     channel.start_consuming() # start consuming (loop never ends) 
     connection.close() 
     print("end") 

connection.add_timeout解决我的问题,也许叫basic_cancel过,但我想用正确的方法。

感谢

注: 我无法回应或本(pika, stop_consuming does not work)由于我的低信誉点添加注释。

注2: 我认为我没有跨线程共享频道或连接(派卡不支持这个),因为我使用“channel_”作为参数而不是“channel”实例(Am I错误?)。

+0

这里回答了这里: http://stackoverflow.com/questions/27624166/pika-stop-consuming-does-not-work –

回答

0

我有同样的问题;因为pika不是线程安全的。即连接和通道不能在线程间安全共享。

所以我用一个单独的连接发送关机消息;然后停止使用callback函数中的原始频道。