2015-04-03 81 views
0

这里是我使用的代码:为什么多处理池的这个实现不起作用?

import pandas as pd 
import sys, multiprocessing 

train_data_file = '/home/simon/ali_bigdata/train_data_user_2.0.csv' 
user_list_file = '/home/simon/ali_bigdata/user_list.txt' 



def feature_extract(list_file, feature_extract_func): 
    tmp_list = [line.strip() for line in open(list_file)] 

    pool = multiprocessing.Pool(multiprocessing.cpu_count()) 
    results_list = pool.map(feature_extract_func, tmp_list) 

    for tmp in results_list: 
     for i in tmp: 
      print i,"\t", 
     print "\n" 

    pool.close() 
    pool.join() 

def user_feature(tmp_user_id): 
    sys.stderr.write("process user " + tmp_user_id + " ...\n") 
    try: 
     tmp_user_df = df_user.loc[int(tmp_user_id)] 
    except KeyError: 
     return [tmp_user_id, 0, 0, 0.0] 
    else: 
     if type(tmp_user_df) == pd.core.series.Series: 
      tmp_user_click = 1 
     else: 
      (tmp_user_click, suck) = tmp_user_df.shape 

     tmp_user_buy_df = tmp_user_df.loc[tmp_user_df['behavior_type'] == 4] 
     if type(tmp_user_buy_df) == pd.core.frame.DataFrame: 
      tmp_user_buy = 1 
     else: 
      (tmp_user_buy, suck) = tmp_user_buy_df.shape 


     return [tmp_user_id, tmp_user_click, tmp_user_buy, 0.0 if tmp_user_click == 0 else float(tmp_user_buy)/tmp_user_click] 


df = pd.read_csv(train_data_file, header=0) 
df_user = df.set_index(['user_id']) 
feature_extract(user_list_file, user_feature) 

我得到的错误是:

process user 102761946 ... 
process user 110858443 ... 
process user 131681429 ... 
Traceback (most recent call last): 
    File "extract_feature_2.0.py", line 53, in <module> 
    feature_extract(user_list_file, user_feature) 
    File "extract_feature_2.0.py", line 13, in feature_extract 
    results_list = pool.map(feature_extract_func, tmp_list) 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map 
    return self.map_async(func, iterable, chunksize).get() 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get 
    raise self._value 
KeyError: 'the label [False] is not in the [index]' 

当程序运行一段时间它发生。

那么这个错误是什么意思,我该如何多处理这个映射函数呢?

这里的输入数据格式

user_id,item_id,behavior_type,user_geohash,item_category,date,time 
99512554,37320317,3,94gn6nd,9232,2014-11-26,20 
9909811,266982489,1,,3475,2014-12-02,23 
98692568,27121464,1,94h63np,5201,2014-11-19,13 

回答

0

很难在里面多用的功能调试错误。您应该关闭多处理器进行调试,然后在修复时重新打开它。我通常在我的函数中有一个mp=True参数,默认情况下它在多处理模式下运行该函数,但可以设置为False以使用常规的非多处理运行map(使用if测试),以便我可以调试这些类型的错误。

所以,你可以设置你的功能就是这样,并与mp=False参数运行进行调试:

def feature_extract(list_file, feature_extract_func, mp=True): 
    tmp_list = [line.strip() for line in open(list_file)] 

    if mp: 
     pool = multiprocessing.Pool(multiprocessing.cpu_count()) 
     results_list = pool.map(feature_extract_func, tmp_list) 
    else: 
     results_list = map(feature_extract_func, tmp_list) 

    for tmp in results_list: 
     for i in tmp: 
      print i,"\t", 
     print "\n" 

    if mp: 
     pool.close() 
     pool.join() 

此外,Pool自动默认使用可用CPU的数量,这样就不会需要设置进程的数量,除非你想要与之不同的东西。

此外,在这种情况下,使用生成器表达式而不是列表理解更有效率(尽管您可以更轻松地切分列表理解,因此对于调试,您可能希望使用列表理解跳转到前面对导致问题的指标):

所以,一旦调试完毕,更换:

tmp_list = [line.strip() for line in open(list_file)] 

有:

tmp_list = (line.strip() for line in open(list_file)) 
+0

在调试方法的帮助下,我终于修好了!感谢:) – 2015-04-07 07:04:37

+0

@simon_xia:到底是什么问题? – mhawke 2015-04-07 07:37:58

+0

@mhawke它是由'df_user.loc [int(tmp_user_id)]'的返回值引起的,当只有一行满足条件时,它可能是一个系列。所以'tmp_user_buy_df = tmp_user_df.loc [tmp_user_df ['behavior_type'] == 4]'这个语句将细分 – 2015-04-11 01:11:35

0

你的避风港没有显示出现错误时正在播放的任何数据。请在您的问题中发布能够触发问题的代表性数据 - 如果您的问题可以复制,那么帮助您会容易得多。

我认为错误在该行发生的事情:

tmp_user_buy_df = tmp_user_df.loc[tmp_user_df['behavior_type'] == 4] 

tmp_user_df['behavior_type'] == 4返回一个布尔值 - 真或假 - 然后将其用作标签。因为标签False不是数据帧中的标签/系列KeyError: 'the label [False] is not in the [index]'被引发。我很疑惑为什么True案件显然有效,但是我们还没有看到您的数据,所以可能会有解释。

你可能打算传递一个布尔数组作为选择器;如果是这样,将行为类型查找包装在列表中,例如,G:

tmp_user_buy_df = tmp_user_df.loc[[tmp_user_df['behavior_type'] == 4]] 

另外,isinstance()优于type(x) == X,见this comprehensive explanation,可以将线

if type(tmp_user_df) == pd.core.series.Series: 

更改为

if isinstance(tmp_user_df, pd.core.series.Series): 

if type(tmp_user_buy_df) == pd.core.frame.DataFrame: 

if isinstance(tmp_user_buy_df, pd.core.frame.DataFrame): 
+0

抱歉有点晚了,我刚刚粘贴了数据格式。我想要做的是选择相同用户的behavior_type 4 – 2015-04-07 01:18:29

+0

感谢您使用'isinstance()'的建议,在其他情况下可以正常工作,但在这种情况下无法工作,不知怎的,这很奇怪 – 2015-04-11 01:22:01

+0

'isinstance ()'的例子相当于使用'type'作为所使用的对象。 – mhawke 2015-04-11 02:08:53