6
截至2017年8月,Pandas DataFame.apply()不幸仍然局限于使用单核,这意味着当您运行df.apply(myfunc, axis=1)
时,多核机器将浪费大部分计算时间。如何在一台机器上利用Pandas Dataframe上的所有内核来并行化apply()?
如何使用所有内核并行运行数据帧?
截至2017年8月,Pandas DataFame.apply()不幸仍然局限于使用单核,这意味着当您运行df.apply(myfunc, axis=1)
时,多核机器将浪费大部分计算时间。如何在一台机器上利用Pandas Dataframe上的所有内核来并行化apply()?
如何使用所有内核并行运行数据帧?
最简单的方法是使用Dask's map_partitions。你需要这些进口(你需要pip install dask
):
import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get
和语法是
data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y,z, ...): return <whatever>
res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)
(我相信30是分区的适当数量的,如果你有16个内核)。只是为了完整性,我计时(16个内核)我的机器上的区别:
data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)
ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)
def vectorized(): return myfunc(data['col1'], data['col2'] )
t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))
28.16970546543598
t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))
2.708152851089835
t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))
0.010668013244867325
给予从熊猫去的因素10加速一个适用于DASK适用于分区。当然,如果你有一个可以进行矢量化的函数,你应该 - 在这种情况下,函数(y*(x**2+1)
)是简单的矢量化的,但是有很多东西是无法矢量化的。
很高兴知道,感谢发布。你能解释为什么你选择了30个分区吗?更改此值时性能是否发生变化? –
@AndrewL我假设每个分区都由一个单独的进程提供服务,并且我认为16个内核可以同时运行16个或32个进程。 我试了一下,性能似乎改善了多达32个分区,但进一步增加没有有益的影响。我假设用四核机器,你会想要8个分区,等等。 请注意,我注意到16和32之间有一些改进,所以我认为你确实需要2x $ NUM_PROCESSORS –