0

我处理两个python队列。
我的问题的简短描述:
客户端通过waiting queue(q1),然后他们(客户端)之后送达。
waiting queue的大小不能大于N(在我的程序中为10)。
如果waiting queue变满,则客户端传递给outside queue(q2,大小20)。如果外部队列已满,客户端将被拒绝并且不能提供服务。
每个离开等待队列的客户端都允许来自外部队列的另一个客户端加入等待队列。无法将项目从一个队列排队到另一个队列

使用队列应该是线程安全的。

下面我大致实现了我想要的。但是我遇到了这个问题 - 在执行serve函数期间将客户端从队列外(q1)排入等待队列(q2)。我想我失去了或忘记了一些重要的东西。我认为这种说法q1.put(client)永久封锁,但不知道为什么。

import time 
import threading 
from random import randrange 
from Queue import Queue, Full as FullQueue 


class Client(object): 
    def __repr__(self): 
     return '<{0}: {1}>'.format(self.__class__.__name__, id(self)) 


def serve(q1, q2): 
    while True: 
     if not q2.empty(): 
      client = q2.get() 
      print '%s leaved outside queue' % client 
      q1.put(client) 
      print '%s is in the waiting queue' % client 
      q2.task_done() 

     client = q1.get() 
     print '%s leaved waiting queue for serving' % client 
     time.sleep(2) # Do something with client 
     q1.task_done() 


def main(): 
    waiting_queue = Queue(10) 
    outside_queue = Queue(20) 

    for _ in range(2): 
     worker = threading.Thread(target=serve, args=(waiting_queue, outside_queue)) 
     worker.setDaemon(True) 
     worker.start() 

    delays = [randrange(1, 5) for _ in range(100)] 

    # Every d seconds 10 clients enter to the waiting queue 
    for d in delays: 
     time.sleep(d) 
     for _ in range(10): 
      client = Client() 
      try: 
       waiting_queue.put_nowait(client) 
      except FullQueue: 
       print 'Waiting queue is full. Please line up in outside queue.' 
       try: 
        outside_queue.put_nowait(client) 
       except FullQueue: 
        print 'Outside queue is full. Please go out.' 

    waiting_queue.join() 
    outside_queue.join() 
    print 'Done' 

回答

0

最后我找到了解决方案。我检查文档更周到 If full() returns True it doesn’t guarantee that a subsequent call to get() will not blockhttps://docs.python.org/2/library/queue.html#Queue.Queue.full

这就是为什么q1.full()不是几个线程可靠。在将项插入队列和检查队列已满后,我添加了互斥锁:

class Client(object): 
    def __init__(self, ident): 
     self.ident = ident 

    def __repr__(self): 
     return '<{0}: {1}>'.format(self.__class__.__name__, self.ident) 


def serve(q1, q2, mutex): 
    while True: 
     client = q1.get() 
     print '%s leaved waiting queue for serving' % client 
     time.sleep(2) # Do something with client 
     q1.task_done() 

     with mutex: 
      if not q2.empty() and not q1.full(): 
       client = q2.get() 
       print '%s leaved outside queue' % client 
       q1.put(client) 
       print '%s is in the waiting queue' % client 
       q2.task_done() 


def main(): 
    waiting_queue = Queue(10) 
    outside_queue = Queue(20) 

    lock = threading.RLock() 

    for _ in range(2): 
     worker = threading.Thread(target=serve, args=(waiting_queue, outside_queue, lock)) 
     worker.setDaemon(True) 
     worker.start() 

    # Every 1-5 seconds 10 clients enter to the waiting room 
    i = 1 # Used for unique <int> client's id 
    while True: 
     delay = randrange(1, 5) 
     time.sleep(delay) 
     for _ in range(10): 
      client = Client(i) 
      try: 
       lock.acquire() 
       if not waiting_queue.full(): 
        waiting_queue.put(client) 
       else: 
        outside_queue.put_nowait(client) 
      except FullQueue: 
       # print 'Outside queue is full. Please go out.' 
       pass 
      finally: 
       lock.release() 

      i += 1 

    waiting_queue.join() 
    outside_queue.join() 
    print 'Done' 

现在它运行良好。

相关问题