2014-12-13 127 views
3

结合我不知道如果我所试图做的是一个有效的做法,但这里有云: 我需要我的程序进行高度并行化,所以我想我可以做2-3进程,每个进程可以有2-3个线程。Python的多处理多线程

1)这可能吗? 2)有什么意思? 3)这是我的代码,但当我尝试加入过程时,它会挂起。

PQ = multiprocessing.Queue() 

[...]

def node(self, files, PQ): 

     l1, l2 = self.splitList(files) 
     p1 = multiprocessing.Process(target=self.filePro, args=(l1,PQ,)) 
     p2 = multiprocessing.Process(target=self.filePro, args=(l2,PQ,)) 
     p1.daemon = True 
     p2.daemon = True 
     p1.start() 
     p2.start() 

     p1.join() # HANGS HERE 
     p2.join() 
     while 1: 
      if PQ.empty(): 
       break 
      else: 
       print(PQ.get()) 
     PQ.join() 

    def filePro(self,lst,PQ): 
     TQ = queue.Queue() 
     l1, l2 = self.splitList(lst) 
     t1 = threading.Thread(target=self.fileThr, args=('a',l1,TQ,)) 
     t2 = threading.Thread(target=self.fileThr, args=('b',l2,TQ,)) 
     t1.daemon = True 
     t2.daemon = True 
     t1.start() 
     t2.start() 

     t1.join() 
     t2.join() 
     while 1: 
      if TQ.empty(): 
       break 
      else: 
       PQ.put(TQ.get()) 
       TQ.task_done() 
     TQ.join() 

def fileThr(self,id,lst,TQ): 
     while lst: 
      tmp_path = lst.pop() 
      if (not tmp_path[1]): 
       continue 
      for item in tmp_path[1]: 
       TQ.put(1) 
     TQ.join() 
+0

当我需要最大限度地利用cpu时,我使用了进程,当我阻塞磁盘访问,网络等操作时使用线程。因此,如果我有脚本来下载许多文件,我会创建一个线程池并使用它。如果我有一个分布式计算可以达到峰值cpu,那么我会使用一个进程池。 – 2014-12-13 03:52:27

+0

如果您希望我们调试您的代码,我们需要[最小,完整,可验证的示例](http://stackoverflow.com/help/mcve)。 – abarnert 2014-12-13 04:21:19

回答

6

1)这是可能的吗?

是的。


2)是否有任何点成吗?

是的。但通常不是你要找的点。首先,几乎每个现代操作系统都使用“扁平”调度程序;还有分散在3个方案或在8个项目8线程8个线程之间没有差异。*

*有些程序可以通过仔细使用进程内,只锁或其他同步原语在一些地方,你知道得到显著的好处你只与同一个程序中的线程共享 - 当然,通过避免在这些地方共享内存 - 但你不会通过平均地跨线程和线程跨线程分散你的作业来获得这种好处。其次,即使您在默认的CPython解释器中使用旧的SunOS,全局解释器锁(GIL)也可以确保一次只有一个线程可以运行Python代码。如果你花时间运行显式释放GIL的C扩展库中的代码(就像一些NumPy函数一样),线程可以提供帮助,但否则它们都会以序列化的方式结束。

主要情况线程和进程是有用在一起就是你有两个CPU绑定和I/O密集型工作。在这种情况下,通常一方正在喂食另一方。如果I/O提供给CPU,则使用主进程中的单个线程池来处理I/O,然后使用一组工作进程来对结果执行CPU工作。如果相反,使用工作进程池来完成CPU工作,然后让每个工作进程使用线程池来执行I/O。


3)这是我的代码,但它挂起当我尝试加入的进程。

当您不给minimal, complete, verifiable example时,调试代码非常困难。

但是,我可以看到一个明显的问题。

您正在尝试使用TQ作为生产者 - 消费者队列,与t1t2作为生产者和filePro父为消费者。在t1.join()t2.join()返回之前,您的消费者不会拨打TQ.task_done(),直到这些线程完成后才会发生。但是这些制作人不会完成,因为他们正在等待你拨打TQ.task_done()。所以,你有一个僵局。

而且,由于您的每个子进程的主线程都处于死锁状态,所以它们永远都不会完成,因此p1.join()将永远阻塞。

如果你真的想让主线程在做任何工作之前等待其他线程完成,那么你不需要生产者 - 消费者习惯用法;只要让孩子们在不打电话TQ.join()的情况下完成他们的工作并退出,并且不要打扰父母的TQ.task_done()。 (请注意,您已经在与PQ正确地做这个。)

如果,另一方面,你希望他们并行工作,不要试图join子线程,直到你完成你的循环。

+0

谢谢!这是一个非常完整的答案,但是现在我还有1个关于你的第二个答案的问题。 1)关于GIL,这是否意味着如果我产生了30个线程,它将与产卵1相同?因为你说他们最终被序列化了...... – 2014-12-13 11:42:23

+0

@安吉洛已知:不,它不完全相同。你不会得到*并行性*。也就是说,即使你有32个内核,使用30个线程的运行速度也不会比使用1快。但是,你确实得到* concurrency *。线程在你的任务之间自动交错工作,并且它们可以自动处理阻塞 - 例如,如果一个线程正在等待I/O,系统将安排一个不同的线程来运行,而不是阻塞整个程序。除非你编写的代码是_explicitly_死锁(如你的例子),一个线程不会阻止另一个进程。 – abarnert 2014-12-17 23:47:25

+0

@AngeloUknown:我找不到一个很好的资源来讨论Python特有术语的差异,但Haskell wiki上的[Parallelism vs. Concurrency](https://www.haskell.org/haskellwiki/Parallelism_vs._Concurrency)如果你忽略Haskell特定的东西,这是一个相当不错的概述。 – abarnert 2014-12-17 23:48:33