以下代码有效,但由于传递大型数据集而速度很慢。在实际实现中,创建过程和发送数据所需的速度与计算时间几乎相同,因此在创建第二个过程时,第一个过程几乎完成了计算,并行化?无意义。通过共享内存传递一组字节的多处理
代码与此问题Multiprocessing has cutoff at 992 integers being joined as result中的代码相同,建议的更改工作如下,并在以下实施。然而,我假设其他人遇到了常见问题,需要花费很长时间来腌制大量数据。
我看到答案使用multiprocessing.array传递共享内存数组。我有一个〜4000的索引数组,但每个索引都有一个包含200个键/值对的字典。数据仅由每个进程读取,完成一些计算,然后返回矩阵(4000x3)(不含字典)。
这样的答案Is shared readonly data copied to different processes for Python multiprocessing?使用地图。是否有可能维护下面的系统并实现共享内存?有没有一种有效的方法可以将数据发送到每个进程,并使用一系列的字典,比如将字典封装在某个管理器中,然后将其放入multiprocessing.array中?
import multiprocessing
def main():
data = {}
total = []
for j in range(0,3000):
total.append(data)
for i in range(0,200):
data[str(i)] = i
CalcManager(total,start=0,end=3000)
def CalcManager(myData,start,end):
print 'in calc manager'
#Multi processing
#Set the number of processes to use.
nprocs = 3
#Initialize the multiprocessing queue so we can get the values returned to us
tasks = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
#Setup an empty array to store our processes
procs = []
#Divide up the data for the set number of processes
interval = (end-start)/nprocs
new_start = start
#Create all the processes while dividing the work appropriately
for i in range(nprocs):
print 'starting processes'
new_end = new_start + interval
#Make sure we dont go past the size of the data
if new_end > end:
new_end = end
#Generate a new process and pass it the arguments
data = myData[new_start:new_end]
#Create the processes and pass the data and the result queue
p = multiprocessing.Process(target=multiProcess,args=(data,new_start,new_end,result_q,i))
procs.append(p)
p.start()
#Increment our next start to the current end
new_start = new_end+1
print 'finished starting'
#Print out the results
for i in range(nprocs):
result = result_q.get()
print result
#Joint the process to wait for all data/process to be finished
for p in procs:
p.join()
#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
print 'started process'
results = []
temp = []
for i in range(0,22):
results.append(temp)
for j in range(0,3):
temp.append(j)
result_q.put(results)
return
if __name__== '__main__':
main()
通过只把字典的列表为管理者解决
,问题解决了。
manager=Manager()
d=manager.list(myData)
似乎持有该列表的经理也管理该列表包含的字典。启动时间有点慢,所以看起来数据仍然被复制,但是它在数据开始和之后的数据被分割的过程中一次完成。
import multiprocessing
import multiprocessing.sharedctypes as mt
from multiprocessing import Process, Lock, Manager
from ctypes import Structure, c_double
def main():
data = {}
total = []
for j in range(0,3000):
total.append(data)
for i in range(0,100):
data[str(i)] = i
CalcManager(total,start=0,end=500)
def CalcManager(myData,start,end):
print 'in calc manager'
print type(myData[0])
manager = Manager()
d = manager.list(myData)
#Multi processing
#Set the number of processes to use.
nprocs = 3
#Initialize the multiprocessing queue so we can get the values returned to us
tasks = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
#Setup an empty array to store our processes
procs = []
#Divide up the data for the set number of processes
interval = (end-start)/nprocs
new_start = start
#Create all the processes while dividing the work appropriately
for i in range(nprocs):
new_end = new_start + interval
#Make sure we dont go past the size of the data
if new_end > end:
new_end = end
#Generate a new process and pass it the arguments
data = myData[new_start:new_end]
#Create the processes and pass the data and the result queue
p = multiprocessing.Process(target=multiProcess,args=(d,new_start,new_end,result_q,i))
procs.append(p)
p.start()
#Increment our next start to the current end
new_start = new_end+1
print 'finished starting'
#Print out the results
for i in range(nprocs):
result = result_q.get()
print len(result)
#Joint the process to wait for all data/process to be finished
for p in procs:
p.join()
#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
#print 'started process'
results = []
temp = []
data = data[start:end]
for i in range(0,22):
results.append(temp)
for j in range(0,3):
temp.append(j)
print len(data)
result_q.put(results)
return
if __name__ == '__main__':
main()
你的代码似乎表明对于一个输入范围,你只产生一个输出。这是你想要的吗? – 2014-09-02 15:02:44
要使用共享内存,您必须将您的数组字符串转换为“ctypes”对象,然后使用['multiprocessing.sharedctypes'](https://docs.python.org/2.7/library/multiprocessing。 HTML#模块multiprocessing.sharedctypes)。我不确定这对您的用例是否可行。 – dano 2014-09-02 17:03:03
@HaiVu它为每个进程返回一个浮点数组。结果=范围(0,(992))只是样本列表。 – user1938107 2014-09-02 23:43:46