2017-10-13 73 views
1

我有一个hive格式和快速压缩的parquet文件。它适合内存,pandas.info提供以下数据。在dask分布式fastparquet处理时间不一致

在拼花文件每组的行数仅仅是100K

>>> df.info() 
<class 'pandas.core.frame.DataFrame'> 
Index: 21547746 entries, YyO+tlZtAXYXoZhNr3Vg3+dfVQvrBVGO8j1mfqe4ZHc= to oE4y2wK5E7OR8zyrCHeW02uTeI6wTwT4QTApEVBNEdM= 
Data columns (total 8 columns): 
payment_method_id   int16 
payment_plan_days   int16 
plan_list_price   int16 
actual_amount_paid  int16 
is_auto_renew    bool 
transaction_date   datetime64[ns] 
membership_expire_date datetime64[ns] 
is_cancel     bool 
dtypes: bool(2), datetime64[ns](2), int16(4) 
memory usage: 698.7+ MB 

现在,做一些简单的计算与DASK我得到以下计时

使用线程

>>>time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime() 
'Fri Oct 13 23:44:50 2017' 
141.98732048354384 
'Fri Oct 13 23:44:59 2017' 

使用分布式(本地簇)

>>> c=Client() 
>>> time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime() 
'Fri Oct 13 23:47:04 2017' 
141.98732048354384 
'Fri Oct 13 23:47:15 2017' 
>>> 

这很好,每个约9秒。

现在使用多,来这里的惊喜...

>>> time.asctime();ddf.actual_amount_paid.mean().compute(get=dask.multiprocessing.get);time.asctime() 
'Fri Oct 13 23:50:43 2017' 
141.98732048354384 
'Fri Oct 13 23:57:49 2017' 
>>> 

我希望多和分布式/本地集群是相同数量级的有可能与线程一些差异上(或好或坏)

但是,多处理多花了47倍的时间来在in16柱上做出简单的平均值?

我的环境只是一个新鲜的conda安装与所需的模块。没有任何东西的手艺。

为什么会有这种差异?我无法管理dask/distributed以具有可预测的行为,以便能够根据我的问题的性质在不同的调度程序之间进行明智的选择。

这只是一个玩具的例子,但我一直无法得到一个与我的期望相一致的例子(正如我对阅读文档的理解一样)。

有什么我应该留在我的脑海里?还是我完全错过了这一点?

感谢

JC

回答

1

与螺纹调度,每个任务都有访问过程的所有记忆 - 所有在此情况下,数据的 - 因此可以做它的计算没有任何记忆复制。

对于分布式调度程序,调度程序知道哪个线程和哪个工作程序正在生成后续任务所需的数据,或者已经在内存中拥有该数据。调度程序的聪明性专门用于将计算移动到正确的工作人员,以避免数据通信和复制。

相反,多进程调度程序倾向于将任务结果发送到主进程或从主进程发送任务结果,这可能涉及很多序列化和复制。有些任务可以融合在一起(通过在链中调用多个python函数来组合任务),但有些不能。任何序列化和复制都需要CPU的努力,对你来说可能更重要的是内存空间。如果您的原始数据占系统总量的很大一部分,那么您可能会填满物理内存,导致大的因素减慢。

+0

是的,经过一些摔跤和大量的手册重读,我实际上得出了同样的结论。很多学习;-) –