2014-09-06 107 views
20

我有一个大的数据帧(数百万行)。如何有效地遍历连续的熊猫数据帧块

我希望能够对其执行groupby操作,但只是按任意连续(最好是大小相同)的行子集进行分组,而不是使用单个行的任何特定属性来决定它们将转到哪个组。

用例:我想通过IPython中的平行映射将函数应用于每一行。因为函数根据一行中的一行计算结果,所以哪些行将访问哪个后端引擎并不重要。 (从概念上讲至少,在现实中它的矢量。)

我想出这样的事情:

# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to 
max_idx = dataframe.index.max() 
tenths = ((10 * dataframe.index)/(1 + max_idx)).astype(np.uint32) 

# Use this value to perform a groupby, yielding 10 consecutive chunks 
groups = [g[1] for g in dataframe.groupby(tenths)] 

# Process chunks in parallel 
results = dview.map_sync(my_function, groups) 

但这似乎很啰嗦,但并不保证同等大小的块。特别是如果索引是稀疏或非整数或其他。

任何建议更好的方法?

谢谢!

回答

20

实际上,您不能保证等大小的块:行数可能是质数,毕竟,在这种情况下,您唯一的块选项将是大小为1的块或一个大块。我倾向于将一个数组传递给groupby。从开始:

>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15) 
>>> df[0] = range(15) 
>>> df 
    0   1   2   3   4 
0 0 0.746300 0.346277 0.220362 0.172680 
0 1 0.657324 0.687169 0.384196 0.214118 
0 2 0.016062 0.858784 0.236364 0.963389 
[...] 
0 13 0.510273 0.051608 0.230402 0.756921 
0 14 0.950544 0.576539 0.642602 0.907850 

[15 rows x 5 columns] 

在那里我已经有意将其设置为0取得无信息的索引,我们只是在我们的大小决定(在这里10)和整数除以它的数组:

>>> df.groupby(np.arange(len(df))//10) 
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c> 
>>> for k,g in df.groupby(np.arange(len(df))//10): 
...  print(k,g) 
...  
0 0   1   2   3   4 
0 0 0.746300 0.346277 0.220362 0.172680 
0 1 0.657324 0.687169 0.384196 0.214118 
0 2 0.016062 0.858784 0.236364 0.963389 
[...] 
0 8 0.241049 0.246149 0.241935 0.563428 
0 9 0.493819 0.918858 0.193236 0.266257 

[10 rows x 5 columns] 
1  0   1   2   3   4 
0 10 0.037693 0.370789 0.369117 0.401041 
0 11 0.721843 0.862295 0.671733 0.605006 
[...] 
0 14 0.950544 0.576539 0.642602 0.907850 

[5 rows x 5 columns] 

虽然您始终可以使用.iloc[a:b]忽略索引值并按位置访问数据,但基于切片DataFrame的方法可能会在索引与该索引不兼容时失败。

+0

这就是我想到的!在技​​术上,“df.groupby(np.arange(len(df))//(len(df)/ 10))”得到固定数量的组(每个核心1个)而不是固定大小。出于某种原因,我并没有想到,分组密钥实际上并不需要与索引完全相关...... – 2014-09-06 18:01:58

+1

值得一提的是,为了提高效率,最好使用“迭代器”来读取原始文件( https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html)和“chunksize”,以便read_csv函数执行读取操作,并且每个片段都可以传递到单独的进程,如@Ryan – 2017-11-08 00:01:16

19

我不确定这是否正是你想要的,但我发现这些石斑函数another SO thread对于做一个多处理器池非常有用。

下面是该线程,这可能会做这样的事情,你想要什么简单的例子:

import numpy as np 
import pandas as pds 

df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd']) 

def chunker(seq, size): 
    return (seq[pos:pos + size] for pos in xrange(0, len(seq), size)) 

for i in chunker(df,5): 
    print i 

它给你这样的:

  a   b   c   d 
0 0.860574 0.059326 0.339192 0.786399 
1 0.029196 0.395613 0.524240 0.380265 
2 0.235759 0.164282 0.350042 0.877004 
3 0.545394 0.881960 0.994079 0.721279 
4 0.584504 0.648308 0.655147 0.511390 
      a   b   c   d 
5 0.276160 0.982803 0.451825 0.845363 
6 0.728453 0.246870 0.515770 0.343479 
7 0.971947 0.278430 0.006910 0.888512 
8 0.044888 0.875791 0.842361 0.890675 
9 0.200563 0.246080 0.333202 0.574488 
      a   b   c   d 
10 0.971125 0.106790 0.274001 0.960579 
11 0.722224 0.575325 0.465267 0.258976 
12 0.574039 0.258625 0.469209 0.886768 
13 0.915423 0.713076 0.073338 0.622967 

我希望帮助。

EDIT

在这种情况下,我在(约)以这种方式使用该功能与pool of processors

from multiprocessing import Pool 

nprocs = 4 

pool = Pool(nprocs) 

for chunk in chunker(df, nprocs): 
    data = pool.map(myfunction, chunk) 
    data.domorestuff() 

我假定这应该是非常类似于使用IPython的分布式机械,但还没没有尝试过。

+0

那肯定会诀窍。我仍然有点想要一些整齐的单打比赛,但如果没有那样做,你会得到奖品:-) – 2014-09-06 17:21:28

7

的良好环境标志是很多的选择,所以我会从Anaconda Blaze添加这个,真用Odo

import blaze as bz 
import pandas as pd 

df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]}) 

for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2): 
    # Do stuff with chunked dataframe 
6

使用numpy的有这个建于:np.array_split()

import numpy as np 
import pandas as pd 

data = pd.DataFrame(np.random.rand(10, 3)) 
for chunk in np.array_split(data, 5): 
    assert len(chunk) == len(data)/5