2012-07-19 78 views
2

所以我写了一个工具,它将项目列表分割成给定数量的列表(比如说10),然后将这10个列表和产生的10个线程分开,“ EvaluationThreads“(扩展threading.thread),并且每个线程评估提供的任何评估值。当我开始每个线程我把他们都到一个列表和它们产卵起飞后我有以下代码:正确的方法来处理Python中的线程连接

for th in threadList: 
    th.join() 
    someTotal = th.resultsAttribute 

这就是我如何处理等待所有线程完成,并收集他们的信息。虽然这是等待所有事情完成并收集结果的一种工作方式,但我觉得必须有一种更优雅的方式来完成它,因为这些线程可能在不同的时间完成,并且如果第一个线程开始完成,早些完成的人必须等待该线程完成才能加入。有没有办法获得这些线程的信息,并在完成时加入它们,而不是按照它们开始的顺序加入它们?我原本认为我会在线程中使用某种回调,但我不确定是否有更可接受的解决方案。

感谢您的帮助。

编辑:为了澄清,我的评估函数没有CPU绑定,我不是试图在线程之间分配文件以尽快完成它,每个线程都有固定的偶数数量的作业。

+0

为什么你的问题是一个问题?一个已经完成但尚未加入的线程正在浪费很少的资源(基本上,内核或用户空间中操作系统维护的表中的一个小表项)。 – abarnert 2012-07-20 00:25:06

+0

我想这不完全是一个问题,但它似乎是一个非常不雅的解决方案,如果空闲线程不是一个只是等待加入的问题,我想我不会担心它。 – hkothari 2012-07-20 00:39:37

+2

备注:如果您的“评估”操作受CPU限制,那么在此应用程序中使用线程可能没有获得太多好处。阅读CPython的全球解释器锁定(GIL)。 – 2012-07-20 01:00:09

回答

1

使用队列尽快从你的线程将信息推了,因为它是可用的:

比方说,这是你的线程:

class myThread(threading.Thread): 
    def __init__(self, results_queue): 
     self.results_queue = results_queue 
     #other init code here 


    def run(self): 
     #thread code here 

     self.results_queue.put(result) #result is the information you want from the thread 

这是你的主要代码:

import Queue #or "import queue" in Python 3.x 
results_queue = Queue() 

#thread init code here 

for i in xrange(num_threads_running): 
    data = results_queue.get() # queue.get() blocks until some item is available 
    #process data as it is made available 

#at this point, there is no need to .join(), since all the threads terminate as soon as they put data to the queue. 
+0

我相信只有当你明确地加入一个线程或者Thread对象被销毁时,底层的系统资源才会被释放,这意味着需要调用join(),除非你关于退出,或者你有一个用块或等价物来管理线程对象。 – abarnert 2012-07-20 01:38:45

+1

@abarnert:你有链接吗?我无法在文档中找到任何关于此的信息。所有它在Thread.join()下面说的是它“阻塞调用线程,直到它的连接方法被调用的线程终止。” – 2012-07-20 01:55:26

+1

@abarnert:从文档中,“其他线程可以调用一个线程的join()方法。这会阻塞调用线程,直到其调用join()方法的线程终止。”这意味着线程终止独立于被调用的join()方法而发生。 – 2012-07-20 02:23:47

2

对于你的主要问题:

如果你正在做更复杂的事情t这个 - 或者,特别是如果你反复这样做 - 你可能需要一个“线程组”类。其中有几十个是预制的,但如果你不喜欢其中任何一个,自己写一个就很麻烦。

然后,而不是这样的:

threadList = [] 
for argchunk in splitIntoChunks(values, 10): 
    threadList.append(threading.Thread(target=myThreadFunc, args=argchunk)) 
... 
someTotal = 0 
for th in threadList: 
    th.join() 
    someTotal += th.resultsAttribute 

你可以这样做:

threadGroup = ThreadGroup.ThreadGroup() 
for argchunk in splitIntoChunks(values, 10): 
    threadGroup.newThread(myThreadFunc, argchunk) 
threadGroup.join() 
someTotal = sum(th.resultsAttribute for th in threadGroup) 

或者,甚至更好,一个完整的线程池库,这样你就可以做到这一点:

pool = ThreadPool(10) 
for argchunk in splitIntoChunks(values, 100): 
    pool.putRequest(myThreadFunc, argchunk) 
pool.wait() 

这里的优势是,您可以轻松地在10个线程上安排100个作业,而不是10个作业o每个线程不需要维护一个队列等等。缺点是你不能只迭代线程来获得返回值,你必须迭代工作 - 理想情况下,你不想保留工作活着直到最后,以便您可以迭代它们。

这给我们带来了第二个问题,即如何从线程(或作业)中获取值。有很多种方法可以做到这一点。

你做了什么工作。你甚至不需要任何锁定。

使用回调,如你所说,也适用。但请记住,回调将在工作线程上运行,而不是主线程,因此如果它访问某个全局​​对象,则需要某种同步。

如果你想反正同步,可能没有任何好处的回调。例如,如果你所要做的只是求和一堆值,你可以设置total=[0],并让每个线程在锁内执行total[0] += myValue。 (当然,在这种情况下,它可能更有意义,只是做在主线程中求和,避免锁,但如果同化的结果的工作很多更为强大,这样的选择可能不是那么简单。)

您也可以使用某种原子对象,而不是显式锁定。例如,标准Queue.Queue和collections.deque都是不可分割的,因此每个线程都可以只设置q = Queue.Queue(),则每个线程推动通过连接你只是重复和总结队列的值之后做q.push(myValue),那么它的结果。事实上,如果每个线程都只需要推送一次队列,就可以在队列本身上执行10次阻塞获取,之后您就知道group.join()pool.wait()或其他任何会快速返回的队列。

或者你甚至可以把回调的工作到一个队列。再次,你可以做10个阻塞获取队列,每次执行结果。

如果每个线程都可以返回多个对象,他们可以把警戒值或回调到时,他们正在做的,你的主线程不断出现,直到它显示的是10个哨兵队列。

相关问题