我在写一个threading.Thread
子类,它实现了“可调用”方法。我该如何改进此代码
通常使用线程上的任意方法,该方法以任何线程调用它的方式运行。这些方法在它们被调用的线程中运行,并阻止调用线程中的进一步执行,或者返回一种方法来获取结果,具体取决于使用哪个装饰器来创建方法。代码是:
import threading
import collections
class CallableThread(threading.Thread):
def __init__(self, *args, **kwargs):
#_enqueued_calls is used to store tuples that encode a functin call. It is processed by the run method
self._enqueued_calls = collections.deque()
# _enqueue_call_permission is for callers to signal that they have placed something on the queue
self._enqueue_call_permission = threading.Condition()
super(CallableThread, self).__init__(*args, **kwargs)
@staticmethod
def blocking_method(f):
u"""A decorator function to implement a blocking method on a thread"""
# the returned function enqueues the decorated function and blocks until the decorated function
# is called and returns. It then returns the value unmodified. The code in register runs
# in the calling thread and the decorated method runs in thread that it is called on
def register(self, *args, **kwargs):
call_complete = threading.Condition()
response = collections.deque()
with self._enqueue_call_permission:
self._enqueued_calls.append(((f, self, args, kwargs), response, call_complete))
self._enqueue_call_permission.notify()
with call_complete:
if not response:
call_complete.wait()
return response.popleft()
return register
@staticmethod
def nonblocking_method(f):
u"""A decorator function to implement a non-blocking method on a thread"""
# the returned function enqueues the decorated function and returns a tuple consisting of a condition
# to wait for and a deque to read the result out of. The code in register runs in the calling thread
# and the decorated method runs in thread that it is called on
def register(self, *args, **kwargs):
call_complete = threading.Condition()
response = collections.deque()
with self._enqueue_call_permission:
self._enqueued_calls.append(((f, self, args, kwargs), None, None))
self._enqueue_call_permission.notify()
return call_complete, response
return register
def run(self):
self._run = True
while self._run: # while we've not been killed
with self._enqueue_call_permission: # get the condition so we can wait on it.
if not self._enqueued_calls:
self._enqueue_call_permission.wait() # wait if we need to
while self._enqueued_calls:
((f, self, args, kwargs), response_deque, call_complete) = self._enqueued_calls.popleft()
with call_complete:
response_deque.append(f(self, *args, **kwargs))
call_complete.notify()
def stop(self):
u""" Signal the thread to stop"""
self._run = False
if __name__=='__main__':
class TestThread(CallableThread):
u"""Increment a counter on each call and print the value"""
counter = 0
@CallableThread.blocking_method
def increment(self, tag):
print "{0} FROM: {1}".format(self.counter, tag)
self.counter += 1
class ThreadClient(threading.Thread):
def __init__(self, callable_thread, tag):
self.callable_thread = callable_thread
self.tag = tag
super(ThreadClient, self).__init__()
def run(self):
for i in range(0, 4):
self.callable_thread.increment(self.tag)
t = TestThread()
t.start()
clients = [ThreadClient(t, i) for i in range(0, 10)]
for client in clients:
client.start()
## client.join()
for client in clients:
client.join()
t.stop()
正如你毫无疑问看到,我使用静态方法作为装饰器。该装饰带,他们被应用到并返回它被调用的参数一起入队的装饰功能的功能的方法,一个threading.Condition实例来完成的notify
和collections.deque
实例来纠正过的结果。
有什么建议吗?我在命名,建筑点和鲁棒性
编辑特别感兴趣:有些变化,我基础上作出的建议,而我是从一个解释离开打破了代码,所以我只是固定它。
我觉得你有涉及_enqueue_call_permission(DEF中运行)一个错字 – cpf 2010-07-26 04:41:37