2017-10-17 72 views
1

TL;博士:

是否有可能.set_index()方法在几个DASK Dataframes 并行 同时?或者,是否有可能在几个Dask数据帧上懒洋洋地插入.set_index(),因此会导致 并行地被设置为 我可以.set_index()懒惰(或要并发执行),在Dask Dataframes?

下面是这种情况:

  • 我有几个时间序列
  • 每个时间序列存储的是几个.csv文件。每个文件都包含与特定日期相关的数据。此外,文件分散在不同的文件夹中(每个文件夹包含一个月的数据)
  • 每个时间序列具有不同的采样率
  • 所有时间序列具有相同的列。所有列都包含DateTime等。
  • 数据太大而无法在内存中处理。这就是我使用Dask的原因。
  • 我想将所有时间序列合并到一个DataFrame中,并由DateTime对齐。为此,我需要首先将每个时间序列的所有时间序列设为resample(),并将其设置为常见采样率。然后.join()所有时间系列。
  • .resample()只能应用于索引。因此,在重新采样之前,我需要在每个时间序列的DateTime列上输入.set_index()
  • 在一个时间系列询问.set_index()方法的时候,立刻开始计算。这导致我的代码被阻止并等待。此时,如果检查我的机器资源使用情况,我可以看到许多内核正在使用,但使用率不会超过〜15%。这使我认为,理想情况下,我可以将.set_index()方法同时应用于多个时间序列。

达到上述情况后,我已经尝试了一些不优雅的解决方案并行的几个时间序列.set_index()方法的应用(例如创建multiprocessing.Pool),这是没有成功。在给出更多细节之前,是否有一个清晰的方法来解决上述问题?上述情况是否在实施Dask时有所考虑?

或者,是否可以延迟.set_index()?如果.set_index()方法可以延迟应用,我会创建一个完整的计算图与上述步骤和最后,一切将并行计算 并行 (我认为)。

回答

0

Dask.dataframe需要知道数据帧的所有分区的最小值和最大值,以便合理地执行并行的日期时间操作。默认情况下,它会读取一次数据,以找到好的分区。如果数据没有排序,它会然后做一个洗牌(可能非常昂贵)来排序

在你的情况,这听起来像你的数据已经排序,你可能能够明确提供这些。你应该看看最后一个例子dd.DataFrame.set_index docstring

A common case is when we have a datetime column that we know to be 
    sorted and is cleanly divided by day. We can set this index for free 
    by specifying both that the column is pre-sorted and the particular 
    divisions along which is is separated 

    >>> import pandas as pd 
    >>> divisions = pd.date_range('2000', '2010', freq='1D') 
    >>> df2 = df.set_index('timestamp', sorted=True, divisions=divisions) # doctest: +SKIP 
+0

谢谢你的及时回复@MRocklin。我忘了说。我用'sorted = True'尝试过,但它仍然非常慢(我在谈论大约5年的数据,在很多情况下采样率小于1秒)。即使在'len()'与分区数相匹配时,我也努力争取使用“divisions = divisions”。但我仍然没有足够的时间进一步调查这个问题。但是,总结一下,从你的评论中,你可以说我不能在2个Dataframes上并行轻松调用'set_index()'?即使他们根本不相互关联? –

+0

如果您的数据已分类并且您知道分部,那么您可以轻松调用set_index。您可能想了解有关部门的更多信息:http://dask.pydata.org/en/latest/dataframe-design.html#partitions – MRocklin