2011-08-27 72 views
4

我想使用Python(3.2),以多进程(Ubuntu的)解决了大规模的搜索问题。基本上我想取一个列表,取出第一个项目,找到与对象具有相同属性的所有其他项目,将找到的项目和目标项目加入到一个列表中,将它们从原始列表中删除,然后(循环)重做一次。多处理意味着在整个处理器之间分配工作。代码执行一次没有问题。事实上,它也会循环,因为例外情况被忽略了,似乎做得很好。但在30秒内,它几乎全部用完了16GB的RAM。它可能在python 3.2循环内进行多处理?

到目前为止,我的两个担忧是:1)我循环(并且我得到很多它们)后,立即得到“异常AssertionError:AssertionError('只能测试子进程',)在忽略中”。除此之外,还有大量的RAM使用(我认为这可能与此有关,不确定)。以及2)当我使用更大的数据集时,它似乎并没有进行并行搜索。

我的代码如下所示:

class triangleListWorker(multiprocessing.Process): 
    def __init__(self, work_queue, target, results,start): 
     super().__init__() 
     self.work_queue = work_queue 
     self.results = results 
     self.target = target 
     self.startIndex = start 
    def run(self): 
     while True: 
      try: 
       searching = self.work_queue.get() 
       self.do_search(searching) 

      finally: 
       self.work_queue.task_done() 

    def do_search(self,searching): 
     for x in range(len(searching)): 
      if self.target.same_plane(searching[x]): 
       self.results.append(self.startIndex+x) 

我想要做的,是用来管理器()名单()来存储的目标对象和搜索的对象上也有相同的所有索引平面。

def do_multi_find_connections(self, target,searchList): 
     work_queue = multiprocessing.JoinableQueue() 
     #results= multiprocessing.Queue() 

     cpu_count = multiprocessing.cpu_count() 
     results = multiprocessing.Manager().list() 
     range_per_process = len(searchList) // cpu_count 
     start,end = 0, range_per_process + (len(searchList) % cpu_count) 
     for i in range(cpu_count): 
      worker = triangleListWorker(work_queue,target,results,start) 
      worker.daemon = True 
      worker.start() 
     for x in range(cpu_count): 
      searchsub = [searchList[x] for x in range(start,end)] 
      work_queue.put(searchList[start:end]) 
      #work_queue.put(searchList[start:end]) 
      start,end = end, end + range_per_process 
      print(start,end) 

     work_queue.join() 
     print("can continue...") 

     return results 

    def find_connections(self, triangle_list,doMultiProcessing): 
     tlist = [x for x in triangle_list] 
     print("len tlist", len(tlist)) 
     results = [] 
     self.byPlane = [] 
     if doMultiProcessing: 
      while len(tlist) > 0: 
       results = [] 
       target = tlist[0] 
       #print("target",tcopy[0]) 
       self.do_multi_find_connections(target,tlist) 

       results = self.do_multi_find_connections(target,tlist)#list of indexes 
       plane = [] 

       print(len(results)) 
       print(results) 
       for x in results: 
        plane.append(tlist[x]) 
       new_tlist = [tlist[x] for x in range(len(tlist)) if not x in results] 
       print(len(new_tlist)) 
       tlist = new_tlist 

       self.byPlane.append(plane) 

##    self.byPlane.append(plane) 
##    tlist = [] 

这个代码(也许有点难看)应该循环去寻找下一个平面,排气一切通过调用其上方的功能(它的多)是在平面上。

运行在Ubuntu 11.04 64,蟒3.2。

+0

哎呀。行:除了“AssertionError”不应该在那里..这是我自己明天早上回到办公室时试试。 – Sterling

+0

听起来像你正在产卵无限数量的进程和耗尽计算机RAM。在每个进程的开始处放置一个打印文件,并确保只有CPU_COUNT + 1进程产生。 –

回答

1

而不是使用循环,我认为multiprocessing模块的预期模式是创建一个Pool并使用Pool.map_async方法。 IOW,将你的循环转换成某种迭代器(可能是generator方法)。然后在通过您do_search方法是等效的功能,你的迭代器map_async

+0

我不确定它会为我想要的工作。我确实使用了一个池来编写它,但它仍然有效,但我仍然需要循环,因为我必须将列表中的所有内容都分组到我正在搜索的内容中。我的方法是取第一个项目,然后搜索整个列表(多处理)以找到属于它的所有东西(即在同一个平面上)。我用循环来抓另一个目标。虽然这种方法没有产生任何错误,但它似乎并没有使用我的处理器,我发现它很好奇。此外,它使用大约20GB的RAM,这将是很好,但在Linux上的SWAP似乎并没有得到释放 – Sterling

+0

并且谢谢。 (对不起,用完字符)。如果我今天没有写出更好的解决方案,我会使用这个解决方案:)。 – Sterling

0

您可以使用池类多:

from multiprocessing import Pool 
pool = Pool(processes=5) 
valuesProcessed = pool.map(someFunction, valuesToProcess)