我想使用Python(3.2),以多进程(Ubuntu的)解决了大规模的搜索问题。基本上我想取一个列表,取出第一个项目,找到与对象具有相同属性的所有其他项目,将找到的项目和目标项目加入到一个列表中,将它们从原始列表中删除,然后(循环)重做一次。多处理意味着在整个处理器之间分配工作。代码执行一次没有问题。事实上,它也会循环,因为例外情况被忽略了,似乎做得很好。但在30秒内,它几乎全部用完了16GB的RAM。它可能在python 3.2循环内进行多处理?
到目前为止,我的两个担忧是:1)我循环(并且我得到很多它们)后,立即得到“异常AssertionError:AssertionError('只能测试子进程',)在忽略中”。除此之外,还有大量的RAM使用(我认为这可能与此有关,不确定)。以及2)当我使用更大的数据集时,它似乎并没有进行并行搜索。
我的代码如下所示:
class triangleListWorker(multiprocessing.Process):
def __init__(self, work_queue, target, results,start):
super().__init__()
self.work_queue = work_queue
self.results = results
self.target = target
self.startIndex = start
def run(self):
while True:
try:
searching = self.work_queue.get()
self.do_search(searching)
finally:
self.work_queue.task_done()
def do_search(self,searching):
for x in range(len(searching)):
if self.target.same_plane(searching[x]):
self.results.append(self.startIndex+x)
我想要做的,是用来管理器()名单()来存储的目标对象和搜索的对象上也有相同的所有索引平面。
def do_multi_find_connections(self, target,searchList):
work_queue = multiprocessing.JoinableQueue()
#results= multiprocessing.Queue()
cpu_count = multiprocessing.cpu_count()
results = multiprocessing.Manager().list()
range_per_process = len(searchList) // cpu_count
start,end = 0, range_per_process + (len(searchList) % cpu_count)
for i in range(cpu_count):
worker = triangleListWorker(work_queue,target,results,start)
worker.daemon = True
worker.start()
for x in range(cpu_count):
searchsub = [searchList[x] for x in range(start,end)]
work_queue.put(searchList[start:end])
#work_queue.put(searchList[start:end])
start,end = end, end + range_per_process
print(start,end)
work_queue.join()
print("can continue...")
return results
def find_connections(self, triangle_list,doMultiProcessing):
tlist = [x for x in triangle_list]
print("len tlist", len(tlist))
results = []
self.byPlane = []
if doMultiProcessing:
while len(tlist) > 0:
results = []
target = tlist[0]
#print("target",tcopy[0])
self.do_multi_find_connections(target,tlist)
results = self.do_multi_find_connections(target,tlist)#list of indexes
plane = []
print(len(results))
print(results)
for x in results:
plane.append(tlist[x])
new_tlist = [tlist[x] for x in range(len(tlist)) if not x in results]
print(len(new_tlist))
tlist = new_tlist
self.byPlane.append(plane)
## self.byPlane.append(plane)
## tlist = []
这个代码(也许有点难看)应该循环去寻找下一个平面,排气一切通过调用其上方的功能(它的多)是在平面上。
运行在Ubuntu 11.04 64,蟒3.2。
哎呀。行:除了“AssertionError”不应该在那里..这是我自己明天早上回到办公室时试试。 – Sterling
听起来像你正在产卵无限数量的进程和耗尽计算机RAM。在每个进程的开始处放置一个打印文件,并确保只有CPU_COUNT + 1进程产生。 –