dask.compute(...)预计是阻止呼叫。但是,当我嵌套dask.compute,并且内部一个I/O(如dask.dataframe.read_parquet)时,内部dask.compute不会阻塞。下面是一个伪代码示例:如果我开始2名工人8个处理每个像嵌套dask.compute不会阻止
import dask, distributed
def outer_func(name):
files = find_files_for_name(name)
df = inner_func(files).compute()
# do work with df
return result
def inner_func(files):
tasks = [ dask.dataframe.read_parquet(f) for f in files ]
tasks = dask.dataframe.concat(tasks)
return tasks
client = distributed.Client(scheduler_file=...)
results = dask.compute([ dask.delay(outer_func)(name) for name in names ])
:
dask-worker --scheduler-file $sched_file --nprocs 8 --nthreads 1
,那么我会想到最多2×8并发inner_func运行,因为inner_func(文件) .compute()应该是阻塞的。然而,我观察到的是,在一个工作进程中,只要启动read_parquet步骤,就可能会有另一个inner_func(files).compute()开始运行。所以最后可能会有多个inner_func(files).compute()运行,并且有时可能会导致内存不足错误。
这是预期的行为?如果是这样,有什么办法强制每个工作进程一个inner_func(files).compute()?
这里似乎有点混乱。 dask.dataframe创建延迟对象,在延迟/计算的函数内创建/计算这些对象并不正常。考虑一下,这个函数被发送给一个工作者:你期望计算发生在哪里? – mdurant
在这个例子中的嵌套是相当典型的现实世界的数据流恕我直言。使用像dask DataFrame这样的分布式数据结构并不总是可行/可取的,这样我们可以避免这种嵌套。由于dask DataFrame API比pandas小,并且因为保持工作的串行代码版本非常重要。 从我看到的情况来看,inner_func似乎在dask-worker进程中的多个线程中运行,但我只为每个工作者指定一个线程,例如: dask-worker --scheduler-file sched.json --nprocs 3 - -nthreads 1 - 本地目录/ tmp / – user1527390