2
我有这段代码,基本上它运行的是 channel.start_consuming()。 我希望它在一段时间后停止。Pika channel.stop_consuming不停止start_consuming循环
我认为 channel.stop_consuming()是正确的方法:
def stop_consuming(self, consumer_tag=None):
""" Cancels all consumers, signalling the `start_consuming` loop to
exit.
但它不工作:start_consuming()永远不会结束(从执行不退出这个调用,“结束”从不打印)。
进口单元测试 进口鼠 进口线程 进口时间
_url = "amqp://user:[email protected]/aaa"
class Consumer_test(unittest.TestCase):
def test_startConsuming(self):
def callback(channel, method, properties, body):
print("callback")
print(body)
def connectionTimeoutCallback():
print("connecionClosedCallback")
def _closeChannel(channel_):
print("_closeChannel")
time.sleep(1)
print("close")
if channel_.is_open:
channel_.stop_consuming()
print("stop_cosuming")
else:
print("channel is closed")
#channel_.close()
params = pika.URLParameters(_url)
params.socket_timeout = 5
connection = pika.BlockingConnection(params)
#connection.add_timeout(2, connectionTimeoutCallback)
channel = connection.channel()
channel.basic_consume(callback,
queue='test',
no_ack=True)
t = threading.Thread(target=_closeChannel, args=[channel])
t.start()
print("start_consuming")
channel.start_consuming() # start consuming (loop never ends)
connection.close()
print("end")
connection.add_timeout解决我的问题,也许叫basic_cancel过,但我想用正确的方法。
感谢
注: 我无法回应或本(pika, stop_consuming does not work)由于我的低信誉点添加注释。
注2: 我认为我没有跨线程共享频道或连接(派卡不支持这个),因为我使用“channel_”作为参数而不是“channel”实例(Am I错误?)。
这里回答了这里: http://stackoverflow.com/questions/27624166/pika-stop-consuming-does-not-work –