2014-09-04 79 views
0

我有一个大文件作为我的Python代码的输入,它会产生相应的输出文件。但是,这需要太多时间,我想加快速度。如何平行我的Python代码

现在,我把大文件分成1000个小文件。我想要一个能够启动1000个线程的小脚本,每个线程使用我的原始python代码并拥有自己的输出文件。

任何人都可以给我一个示例/示例代码?

+0

它不会加速它(很多,如果有的话)...你应该只将它拆分成许多部分,因为有可用的内核......并使用多处理库......在python中使用线程的唯一原因是当你有一个图形用户界面时,你不想阻塞...否则你应该使用多处理,如果你需要并行数据处理 – 2014-09-04 17:40:44

+0

你的工作实际上是由CPU(处理)还是由I/O(读写文件)支配?在决定如何并行化之前,您需要通过配置文件来确定_first_。 – abarnert 2014-09-04 17:41:40

+0

它是通过I/O,每条线耗费4ms CPU,我假设I/O应该更高。 – Jin 2014-09-04 17:44:01

回答

1
  • 如果没有1000级的处理器呢,劈1000有没有兴趣......在相反,大的开销...
  • 多线程是管理I/O阻塞更加有效,不并行处理工作。
  • 如果你的问题是我在同一个设备/ O,使更多的会增加其负荷,增加开销(头移动,缓存垃圾...)

什么您在搜索更加多: https://docs.python.org/2/library/multiprocessing.html

+0

我明白了。非常感谢 – Jin 2014-09-04 17:46:05

1

如果您决定使用multiprocessing,那么您将以非常类似的方式完成此操作。 你可以尝试这样的事情:

import Queue 
from threading import Thread 

file_list = ['filea', 'fileb'] 

def do_stuff(q): 
    while True: 
     try: 
      file_name = q.get(False) 
     except Queue.Empty: 
      # Handle empty queue here 
      break 
     # do what ever you need here 
     print file_name 
     q.task_done() 

q = Queue.Queue(maxsize=0) 
num_threads = 2 

for x in file_list: 
    q.put(x) 

for i in range(num_threads): 
    worker = Thread(target=do_stuff, args=(q,)) 
    worker.setDaemon(True) 
    worker.start() 

q.join() 
+0

为什么在'multiprocessing'库有一个内置的时候自己建立一个池(它还增加了你没有构建的所有类型的特性,比如返回值,正确的信号完成和等待等),' concurrent.futures'(或'futures' backport)有一个更容易使用的执行器? – abarnert 2014-09-04 17:48:03

+0

@abarnert同意,但这仅仅是一个例子,显示一个想法。 – Vor 2014-09-04 17:48:39

+0

好的,但是为什么要在几行代码中以艰难的方式构建一个例子,让事情脱节,什么时候可以用简单的方式在几行代码中编写例子并覆盖所有内容? – abarnert 2014-09-04 17:49:19

5

首先,使用1000线几乎肯定会慢下来,不加快速度。即使您的代码完全受I/O限制,1000仍在推动许多平台调度程序的限制,并且您将花费更多时间进行上下文切换,而不是进行实际工作。接下来,您需要知道您的代码是否受CPU限制(即对内存中的信息进行实际处理)或I/O限制(即等待磁盘读取和写入等操作)。


如果你的代码是CPU绑定的,你可以保持CPU的繁忙相当一致的,想要每个核心正是1个线程。这样,通过最少量的上下文切换(和缓存抖动,假设大部分工作在不可变或非共享值上完成),您可以获得最大的并行度。另外(除非那些工作是在专门设计的C扩展中完成的,比如numpy),你希望这些线程在不同的进程中,因为每个进程每次只有一个线程可以一次运行Python解释器,这要归功于全球口译员锁定。

所以,你想要的东西几乎肯定是一个进程池。最简单的方法是使用concurrent.futures.ProcessPoolExecutor,可能带有max_workers参数(也许从16开始,然后尝试上下调整以查看是否有帮助)。


如果,另一方面,你的代码主要是I/O限制,那么几十个线程是合理的,特别是如果延迟是不可预测的,但在同一进程没有1000和线程会工作正常,因为一个线程可以运行Python解释器,而其他线程都在等待操作系统完成磁盘操作。

所以,在这种情况下,你想要一个concurrent.futures.ThreadPoolExecutor


如果你不知道,不知道怎么找出来,用线程池构建它,然后再使用ActivityMonitor或任何Windows现在调用它的进程管理器或您的300个选择喜爱在Linux上观看它运行;如果最终得到100%的核心和其他25%以下的核心,那么你太过于CPU而不能使用线程。幸运的是,切换到进程池是一个微不足道的变化 - 用ProcessPoolExecutor代替ThreadPoolExecutor,并删除max_workers参数,以便Python选择最佳的默认值,现在就完成了。


无论哪种情况,文档中的示例都足够好,因此没有理由要求其他示例代码。