2016-11-18 84 views
3

不处理名单我有这样的脚本来处理某些URL并行:多pool.map为了

import multiprocessing 
import time 

list_of_urls = [] 

for i in range(1,1000): 
    list_of_urls.append('http://example.com/page=' + str(i)) 

def process_url(url): 
    page_processed = url.split('=')[1] 
    print 'Processing page %s'% page_processed 
    time.sleep(5) 

pool = multiprocessing.Pool(processes=4) 
pool.map(process_url, list_of_urls) 

列表是有序的,但是当我运行它,该脚本不挑从网址列表为了:

Processing page 1 
Processing page 64 
Processing page 127 
Processing page 190 
Processing page 65 
Processing page 2 
Processing page 128 
Processing page 191 

而是,我想先处理页面1,2,3,4,然后继续按照列表中的顺序。有没有这样做的选择?

+0

你是说你想将任务分解到多个内核,但仍然按顺序运行吗?或者,您想要按正确顺序处理已处理的网址的输出列表。你的问题建议第一个。 –

+0

第一个。就像你会在下载管理器中设置“最大并行下载”一样,但它们仍然按顺序运行。我想要处理4个URL,例如:从1,2,3,4开始。如果3完成并且1,2,4仍在运行,请启动第5个(http://example.com/page=5) – Hyperion

+0

但是,如果您的辅助函数具有返回值,则返回值的结果列表的顺序对应于输入参数列表的顺序。即使工作人员的功能是无序处理的。 – elcombato

回答

1

如果不通过参数CHUNKSIZE地图将计算使用该算法块:

chunksize, extra = divmod(len(iterable), len(self._pool) * 4) 
if extra: 
    chunksize += 1 

它cuting您可迭代到task_batches和sperate进程中运行它。这就是为什么它不合适。解决方法是将块设为1.

import multiprocessing 
import time 

list_test = range(10) 

def proces(task): 
    print "task:", task 
    time.sleep(1) 

pool = multiprocessing.Pool(processes=3) 
pool.map(proces, list_test, chunksize=1) 

task: 0 
task: 1 
task: 2 
task: 3 
task: 4 
task: 5 
task: 6 
task: 7 
task: 8 
task: 9 
+2

请注意,在大多数情况下,'chunksize = 1'会严重影响性能。 – BallpointBen

0

多处理是一个异步操作,意思是它是按照定义非顺序的。线程(或python的case进程)从你的列表中拉出url,并且不保证哪个进程会先完成。因此url 1可能会在url 64之前开始处理,但由于网络I/O中的随机性,例如url 64可能会先完成。

问问自己是否真的需要首先执行这些操作。如果答案是肯定的,那么最好的办法就是执行一个阻止步骤 - 先强制所有并行计算完成,然后对完成的结果进行排序。因此,如果您的URL列表非常大,并且您希望某些顺序元素仍然可以利用并行化,则可以将您的列表分块,然后通过上面的并行逻辑按顺序运行每个块。