2017-10-20 72 views
0

我想读取浮点数字流,做一些简单的计算并将该值附加到全局列表中。你能说出我错了吗?该列表不附加。更新Dask中的全局列表

from random import random 
from time import sleep 

def process(x): 
    from random import random 
    sleep(random()*2) 
    t = x * 2 
    processed_queue.append(t) 
    print(processed_queue) 
    return t 

if __name__ == "__main__": 

    from distributed import Client 
    from queue import Queue 

    client = Client() 

    processed_queue = [] 
    input_q = Queue() 

    remote_q = client.scatter(input_q) 
    processed_q = client.map(process, remote_q) 
    result_q = client.gather(processed_q) 

    for i in [random() for x in range(100)]: 
     sleep(random()) 
     input_q.put(i) 
     print(i) 
     print(processed_queue) 
     print(result_q.qsize()) 

回答

0

Whilest queue.Queuemultiprocessing.Queue可以用来发送线程和进程,通常这种编程逐副作用的不被DASK鼓励模型之间的数据。

您可以将数据传递给群集执行的函数,并使用client.submit实时获得它们的返回值,那么队列会为您做些什么,否则您无法做到这一点?另外,还有一些dask构造,比如共享变量,可能可以做到这一点,但(再次)很少使用,我认为你不太可能成为正确的范例。

由于代码不适用于您的具体原因,因此:Client()为调度程序创建至少一个独立进程,为具有一个或多个线程的工作程序创建至少一个进程(请参阅您的任务管理器,顶层或其他系统 - 观看工具)。 queue.Queue是流程本地的,因此每个进程都会看到空的队列并添加到该进程中,但是在主进程中没有看到该信息,并且在工作人员中看不到输入队列上的操作。