2017-08-09 119 views
0

dask.compute(...)预计是阻止呼叫。但是,当我嵌套dask.compute,并且内部一个I/O(如dask.dataframe.read_parquet)时,内部dask.compute不会阻塞。下面是一个伪代码示例:如果我开始2名工人8个处理每个像嵌套dask.compute不会阻止

import dask, distributed 

def outer_func(name): 
    files = find_files_for_name(name) 
    df = inner_func(files).compute() 
    # do work with df 
    return result 

def inner_func(files): 
    tasks = [ dask.dataframe.read_parquet(f) for f in files ] 
    tasks = dask.dataframe.concat(tasks) 
    return tasks 

client = distributed.Client(scheduler_file=...) 
results = dask.compute([ dask.delay(outer_func)(name) for name in names ]) 

dask-worker --scheduler-file $sched_file --nprocs 8 --nthreads 1 

,那么我会想到最多2×8并发inner_func运行,因为inner_func(文件) .compute()应该是阻塞的。然而,我观察到的是,在一个工作进程中,只要启动read_parquet步骤,就可能会有另一个inner_func(files).compute()开始运行。所以最后可能会有多个inner_func(files).compute()运行,并且有时可能会导致内存不足错误。

这是预期的行为?如果是这样,有什么办法强制每个工作进程一个inner_func(files).compute()?

+0

这里似乎有点混乱。 dask.dataframe创建延迟对象,在延迟/计算的函数内创建/计算这些对象并不正常。考虑一下,这个函数被发送给一个工作者:你期望计算发生在哪里? – mdurant

+0

在这个例子中的嵌套是相当典型的现实世界的数据流恕我直言。使用像dask DataFrame这样的分布式数据结构并不总是可行/可取的,这样我们可以避免这种嵌套。由于dask DataFrame API比pandas小,并且因为保持工作的串行代码版本非常重要。 从我看到的情况来看,inner_func似乎在dask-worker进程中的多个线程中运行,但我只为每个工作者指定一个线程,例如: dask-worker --scheduler-file sched.json --nprocs 3 - -nthreads 1 - 本地目录/ tmp / – user1527390

回答

0

对于多进程调度程序,这似乎不是这种情况。

为了使用分布式调度程序,我通过使用分布式。客户端API而不是依靠dask.compute来使用节奏作业提交找到了解决方法。 dask.compute适用于简单用例,但显然没有一个好主意可以安排多少个未完成的任务,因此在这种情况下会超出系统。

下面是的dask.Delayed任务的集合与起搏运行的伪代码:

import distributed as distr 

def paced_compute(tasks, batch_size, client): 
    """ 
    Run delayed tasks, maintaining at most batch_size running at any 
    time. After the first batch is submitted, 
    submit a new job only after an existing one is finished, 
    continue until all tasks are computed and finished. 

    tasks: collection of dask.Delayed 
    client: distributed.Client obj 
    """ 
    results, tasks = [], list(tasks) 
    working_futs = client.compute(tasks[:batch_size]) 
    tasks = tasks[batch_size:] 
    ac = distr.as_completed(working_futs) 
    for fut in ac: 
     res = fut.result() 
     results.append(res) 
     if tasks: 
      job = tasks.pop() 
      ac.add(client.compute(job)) 
    return results 
0

当你问分布式调度的DASK运行工作,它的函数船舶代码,任何数据对于处于不同进程中的工作者功能(可能在不同的机器上)是必需的。这些工作进程忠实地执行这些函数,并以正常的python代码运行。重点是,运行函数不知道它在一个dask worker上 - 它默认会看到没有设置全局的dask分布式客户端,并且执行dask通常为这种情况做的事情:执行任何dask默认调度程序(线程化)上的工作负载。

如果您确实必须在任务内执行完整的dask-compute操作,并且希望这些操作使用运行这些任务的分布式调度程序,则需要使用worker client。不过,我觉得在你的情况下,重新执行这个工作以删除嵌套(类似上面的伪代码,尽管这也可以与计算一起工作)可能是更简单的方法。