2017-05-05 105 views
3

当在包含numpy数组的dask.bag上执行foldby时,我从dask/numpy得到非常无意义的FutureWarning消息。dask bag foldby with numpy arrays

def binop(a, b): 
    print('binop') 
    return a + b[1] 

def combine(a, b): 
    print('combine') 
    return a + b[1] 

seq = ((np.random.randint(0, 5, size=1)[0], np.ones(5,)) for _ in range(50)) 
db.from_sequence(seq, partition_size=10)\ 
    .foldby(0, binop=binop, initial=np.zeros(5,), combine=combine)\ 
    .compute() 

目标是加起来一堆NumPy数组。这会产生正确的结果,但也产生FutureWarning消息(看起来像每个分区一个)NumPy虽然它看起来好像他们来自dask

dask/async.py:247:FutureWarning:elementwise comparison failed;返回标代替,但在未来将执行的elementwise比较 回报FUNC(* args2)

只是增加了两个numpy阵列,而不dask不会产生这样有明确的一些参与与并行.foldby这里。看起来在任何计算完成之前都会生成警告。

  • 如何确定警告是否应该关注?
  • 如果我应该关注它,我该如何让警告消失?

我使用python 3.6 DASK 0.14.1和numpy的1.12.1

dask.bag.foldby


UPDATE

感谢@ MRocklin的答案,我开始寻找到这个多一点。因此,在dask.async.py有问题的代码是this

def _execute_task(arg, cache, dsk=None): 
.... 
    if isinstance(arg, list): 
     return [_execute_task(a, cache) for a in arg] 
    elif istask(arg): 
     func, args = arg[0], arg[1:] 
     args2 = [_execute_task(a, cache) for a in args] 
     return func(*args2) 

是有可能,dask实际上是在试图遍历numpy数组中args2 = [_execute_task(a, cache) for a in args],我不知道内部不够好(在所有的时候)的判断这些变量包含的内容。

回答

2

这有事情做与cytoolz.itetoolz.reducebyinit值。将init从init=np.zeros((5,))更改为init=lambda: np.zeros((5,))至少可以摆脱警告消息。

该警告是通过this line

cpdef dict reduceby(object key, object binop, object seq, object init='__no__default__'): 
... 
    cdef bint skip_init = init == no_default 

,其比较在初始化值(np.zeros((5,)))传递给使numpy失败的carraystr元件两比较字符串"__no__default__"生产。

所以回答我的问题:

  • 没有,你不需要担心这个警告,但它可能会在计划在未来
  • 干脆用放慢避免该警告可调用的init
  • 它似乎并没有这个会不会有什么重大的负面影响,但要记住的是,init调用将每一次执行过程被称为
2

这个警告确实来自numpy。通过代码基地产量these lines快速搜索:

if (!strcmp(ufunc_name, "equal") || 
      !strcmp(ufunc_name, "not_equal")) { 
     /* Warn on non-scalar, return NotImplemented regardless */ 
     assert(nin == 2); 
     if (PyArray_NDIM(out_op[0]) != 0 || 
       PyArray_NDIM(out_op[1]) != 0) { 
      if (DEPRECATE_FUTUREWARNING(
        "elementwise comparison failed; returning scalar " 
        "instead, but in the future will perform elementwise " 
        "comparison") < 0) { 
       return -1; 
      } 
     } 

DASK可能使这一点变得更糟,因为你会发现在每一个过程得到警告一次(dask.bag使用进程池默认情况下)。

另外,如果你的计算是通过numpy的束缚,那么你可以考虑切换到线程调度,而不是多重调度

mybag.compute(get=dask.threaded.get) 

http://dask.pydata.org/en/latest/scheduler-choice.html

+0

我仍然不明白,虽然'dask'正在做的阵列产生的警告。该案例的评论为:“这个条件基本上意味着”我们注定要失败“,b/c”灵活的“dtypes - 字符串和无效 - 不能有自己注册的ufunc循环...。这是没有意义的,因为数组的'dtype'是'float64',而不是'string'或'void'。请参阅上面修改的问题 –

+0

Dask.bag只是调用您提供的功能。它没有引入任何特殊的逻辑。 – MRocklin

+0

我正确地认为,如果'init'是一个可调用的函数,它会为每个执行器调用一次吗?这就是'cytoolz'代码的样子 –