2010-07-26 80 views
1

我在写一个threading.Thread子类,它实现了“可调用”方法。我该如何改进此代码

通常使用线程上的任意方法,该方法以任何线程调用它的方式运行。这些方法在它们被调用的线程中运行,并阻止调用线程中的进一步执行,或者返回一种方法来获取结果,具体取决于使用哪个装饰器来创建方法。代码是:

import threading 
import collections 


class CallableThread(threading.Thread): 

    def __init__(self, *args, **kwargs): 
     #_enqueued_calls is used to store tuples that encode a functin call. It is processed by the run method 
     self._enqueued_calls = collections.deque() 
     # _enqueue_call_permission is for callers to signal that they have placed something on the queue 
     self._enqueue_call_permission = threading.Condition()     
     super(CallableThread, self).__init__(*args, **kwargs) 

    @staticmethod 
    def blocking_method(f): 
     u"""A decorator function to implement a blocking method on a thread""" 
     # the returned function enqueues the decorated function and blocks until the decorated function 
     # is called and returns. It then returns the value unmodified. The code in register runs 
     # in the calling thread and the decorated method runs in thread that it is called on 
     def register(self, *args, **kwargs): 
      call_complete = threading.Condition() 
      response = collections.deque() 
      with self._enqueue_call_permission: 
       self._enqueued_calls.append(((f, self, args, kwargs), response, call_complete)) 
       self._enqueue_call_permission.notify() 
      with call_complete: 
       if not response: 
        call_complete.wait() 
      return response.popleft() 
     return register 

    @staticmethod 
    def nonblocking_method(f): 
     u"""A decorator function to implement a non-blocking method on a thread""" 
     # the returned function enqueues the decorated function and returns a tuple consisting of a condition 
     # to wait for and a deque to read the result out of. The code in register runs in the calling thread 
     # and the decorated method runs in thread that it is called on 
     def register(self, *args, **kwargs): 
      call_complete = threading.Condition() 
      response = collections.deque() 
      with self._enqueue_call_permission: 
       self._enqueued_calls.append(((f, self, args, kwargs), None, None)) 
       self._enqueue_call_permission.notify() 
      return call_complete, response 
     return register  

    def run(self):   
     self._run = True 
     while self._run: # while we've not been killed 
      with self._enqueue_call_permission: # get the condition so we can wait on it. 
       if not self._enqueued_calls: 
        self._enqueue_call_permission.wait() # wait if we need to 
      while self._enqueued_calls: 
       ((f, self, args, kwargs), response_deque, call_complete) = self._enqueued_calls.popleft() 
       with call_complete:  
        response_deque.append(f(self, *args, **kwargs)) 
        call_complete.notify() 


    def stop(self): 
     u""" Signal the thread to stop""" 
     self._run = False      



if __name__=='__main__': 
    class TestThread(CallableThread): 
     u"""Increment a counter on each call and print the value""" 
     counter = 0 
     @CallableThread.blocking_method 
     def increment(self, tag): 
      print "{0} FROM: {1}".format(self.counter, tag) 
      self.counter += 1 

    class ThreadClient(threading.Thread): 
     def __init__(self, callable_thread, tag): 
      self.callable_thread = callable_thread 
      self.tag = tag 
      super(ThreadClient, self).__init__() 

     def run(self): 
      for i in range(0, 4): 
       self.callable_thread.increment(self.tag) 

    t = TestThread() 
    t.start() 
    clients = [ThreadClient(t, i) for i in range(0, 10)] 
    for client in clients: 
     client.start() 
##  client.join() 
    for client in clients: 
     client.join() 
    t.stop() 

正如你毫无疑问看到,我使用静态方法作为装饰器。该装饰带,他们被应用到并返回它被调用的参数一起入队的装饰功能的功能的方法,一个threading.Condition实例来完成的notifycollections.deque实例来纠正过的结果。

有什么建议吗?我在命名,建筑点和鲁棒性

编辑特别感兴趣:有些变化,我基础上作出的建议,而我是从一个解释离开打破了代码,所以我只是固定它。

+1

我觉得你有涉及_enqueue_call_permission(DEF中运行)一个错字 – cpf 2010-07-26 04:41:37

回答

2

您可以从ARGS自我解压代替使用

def register(self, *args, **kwargs) 

。另外,为什么使用_RUN的大写字母?

+0

上不拆包自己好一点。关于_RUN的长篇大论。它开始作为一个全局应用于所有线程的另一个模块,并且我总是将这些变量大写。我会继续和改变它,因为它似乎不自然,现在你提到它 – aaronasterling 2010-07-27 00:19:36