2017-09-03 139 views
1

我正在用Dask.delayed取得良好进展。作为一个团队,我们决定花更多时间使用Dask来处理图。将图发布到跨集群节点

我有一个关于分配的问题。我在集群中看到以下行为。我开始每个8个节点上有8个工作人员,每个工作人员有4个线程,然后说我然后client.compute 8个图形创建模拟数据以供后续处理。我想让8个数据集每个节点生成一个。然而,似乎发生的是,并非不合理的是,这八个函数在前两个节点上运行。随后的计算在第一个和第二个节点上运行。因此我认为缺乏缩放。随着时间的推移,其他节点将从诊断工作页面中消失。这是预期的吗?

所以我想先按节点分配数据创建函数。所以,当我想计算图表,现在我做:

if nodes is not None: 
    print("Computing graph_list on the following nodes: %s" % nodes) 
    return client.compute(graph_list, sync=True, workers=nodes, **kwargs) 
else: 
    return client.compute(graph_list, sync=True, **kwargs) 

这似乎是正确设置:诊断进度条显示,我的数据创建功能在内存中,但它们不启动。如果省略节点,则计算按预期进行。在群集和我的桌面上都会发生此行为。

更多信息:看看调度程序日志,我确实看到通信失败。

more dask-ssh_2017-09-04_09\:52\:09/dask_scheduler_sand-6-70\:8786.log 
distributed.scheduler - INFO - ----------------------------------------------- 
distributed.scheduler - INFO - Scheduler at: tcp://10.143.6.70:8786 
distributed.scheduler - INFO -  bokeh at:    0.0.0.0:8787 
distributed.scheduler - INFO -  http at:    0.0.0.0:9786 
distributed.scheduler - INFO - Local Directory: /tmp/scheduler-ny4ev7qh 
distributed.scheduler - INFO - ----------------------------------------------- 
distributed.scheduler - INFO - Register tcp://10.143.6.73:36810 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.73:36810 
distributed.scheduler - INFO - Register tcp://10.143.6.71:46656 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.71:46656 
distributed.scheduler - INFO - Register tcp://10.143.7.66:42162 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.66:42162 
distributed.scheduler - INFO - Register tcp://10.143.7.65:35114 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.65:35114 
distributed.scheduler - INFO - Register tcp://10.143.6.70:43208 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.70:43208 
distributed.scheduler - INFO - Register tcp://10.143.7.67:45228 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.67:45228 
distributed.scheduler - INFO - Register tcp://10.143.6.72:36100 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.72:36100 
distributed.scheduler - INFO - Register tcp://10.143.7.68:41915 
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.68:41915 
distributed.scheduler - INFO - Receive client connection: 5d1dab2a-914e-11e7-8bd1-180373ff6d8b 
distributed.scheduler - INFO - Worker 'tcp://10.143.6.71:46656' failed from closed comm: Stream is clos 
ed 
distributed.scheduler - INFO - Remove worker tcp://10.143.6.71:46656 
distributed.scheduler - INFO - Removed worker tcp://10.143.6.71:46656 
distributed.scheduler - INFO - Worker 'tcp://10.143.6.73:36810' failed from closed comm: Stream is clos 
ed 
distributed.scheduler - INFO - Remove worker tcp://10.143.6.73:36810 
distributed.scheduler - INFO - Removed worker tcp://10.143.6.73:36810 
distributed.scheduler - INFO - Worker 'tcp://10.143.6.72:36100' failed from closed comm: Stream is clos 
ed 
distributed.scheduler - INFO - Remove worker tcp://10.143.6.72:36100 
distributed.scheduler - INFO - Removed worker tcp://10.143.6.72:36100 
distributed.scheduler - INFO - Worker 'tcp://10.143.7.67:45228' failed from closed comm: Stream is clos 
ed 
distributed.scheduler - INFO - Remove worker tcp://10.143.7.67:45228 
distributed.scheduler - INFO - Removed worker tcp://10.143.7.67:45228 
(arlenv) [[email protected] performance]$ 

这是否会引起任何可能的原因?

感谢, 添

回答

0

DASK如何选择任务分配给工人是复杂的,并考虑到像负载均衡,数据传输,资源约束等许多问题。它可以很难推论哪里事情最终会没有一个具体而简单的例子。

你可以尝试的一件事就是一次提交所有的计算,这可以让调度程序做出稍微更明智的决定,而不是一次只看一个事物。

所以,你可以尝试更换这样的代码:

futures = [client.compute(delayed_value) for delayed_value in L] 
wait(futures) 

与这样的代码

futures = client.compute(L) 
wait(futures) 

但老实说,我只给这个解决你的问题有30%的机会。如果不深入分析问题,很难知道发生了什么。如果你可以提供一个简单的可重复代码示例,那么这将是最好的。

+0

为了让分配工作,我必须从名称翻译到IP地址。 –

+0

我的问题undersubscribes内存,这样即使所有的数据被移动到一个节点(从16),它仍然只有25%满。因此,我从8个节点/ 16个线程和几个百分比的内存开始,将其缩小到单个工作上。我需要更多地考虑如何着手。 –

+0

我认为这与内存大小无关。看看调度程序日志。 –