2014-11-06 157 views
12

我正在尝试使用pandas数据框的多处理功能,即将数据帧拆分为8个部分。使用apply(每个部分在不同的进程中处理)应用一些函数。pandas multiprocessing apply

编辑: 这里的解决方案,我终于发现:

import multiprocessing as mp 
import pandas.util.testing as pdt 

def process_apply(x): 
    # do some stuff to data here 

def process(df): 
    res = df.apply(process_apply, axis=1) 
    return res 

if __name__ == '__main__': 
    p = mp.Pool(processes=8) 
    split_dfs = np.array_split(big_df,8) 
    pool_results = p.map(aoi_proc, split_dfs) 
    p.close() 
    p.join() 

    # merging parts processed by different processes 
    parts = pd.concat(pool_results, axis=0) 

    # merging newly calculated parts to big_df 
    big_df = pd.concat([big_df, parts], axis=1) 

    # checking if the dfs were merged correctly 
    pdt.assert_series_equal(parts['id'], big_df['id']) 
+0

'res = df.apply(process apply,axis = 1)'中有一个空格,是吗? – 2014-11-06 16:26:31

+1

@yemu你到底想通过这段代码实现什么? – Dalek 2014-11-06 16:37:14

+0

目前仅适用于饱和CPU的一个内核。我想使用多进程并使用所有内核来减少处理时间 – yemu 2014-11-06 19:29:09

回答

3

因为我没有太多数据的脚本,这是一个猜测,但我会用p.map建议,而不是apply_async与回电话。

p = mp.Pool(8) 
pool_results = p.map(process, np.array_split(big_df,8)) 
p.close() 
p.join() 
results = [] 
for result in pool_results: 
    results.extend(result) 
+0

@yemu为你做了这个工作吗? – 2014-11-06 21:38:01

+0

如果__name__ =='__main__',我必须把调用放在里面。并与其他小的变化,我设法使其工作,但我不确定池结果中的结果数据框是否以与拆分相同的顺序返回。我必须检查它。 – yemu 2014-11-07 09:24:53

+0

在这里看到的解决方案与'dask' https://stackoverflow.com/questions/37979167/how-to-parallelize-many-fuzzy-string-comparisons-using-apply-in-pandas – 2016-06-24 18:03:11

0

我也遇到同样的问题,当我使用multiprocessing.map()应用功能,以不同的块一个的大数据帧的。

我只是想补充几点以防其他人遇到同样的问题。

  1. 记得添加if __name__ == '__main__':
  2. 执行文件中的.py文件,如果使用ipython/jupyter notebook,那么你就不能运行multiprocessing(这是我的情况也是一样的,虽然我不知道)