2015-03-24 53 views
1

我想用下面的代码解析一个巨大的文件(大约23 MB),其中我使用从文件中读取的所有行填充multiprocessing.manager.list。在每个进程的目标例程(parse_line)中,我弹出一行并解析它以创建具有特定解析属性的defaultdict对象,最后将这些对象中的每一个都推送到另一个multiprocessing.manager.list中。用于解析大文件的多处理和共享多处理管理器列表

class parser(object): 
def __init__(self): 
    self.manager = mp.Manager() 
    self.in_list = self.manager.list() 
    self.out_list = self.manager.list() 
    self.dict_list,self.lines, self.pcap_text = [],[],[] 
    self.last_timestamp = [[(999999,0)]*32]*2 
    self.num = Word(nums) 
    self.word = Word(alphas) 
    self.open_brace = Suppress(Literal("[")) 
    self.close_brace = Suppress(Literal("]")) 
    self.colon = Literal(":") 
    self.stime = Combine(OneOrMore(self.num + self.colon) + self.num + Literal(".") + self.num) 
    self.date = OneOrMore(self.word) + self.num + self.stime 
    self.is_cavium = self.open_brace + (Suppress(self.word)) + self.close_brace 
    self.oct_id = self.open_brace + Suppress(self.word) + Suppress(Literal("=")) \ 
      + self.num + self.close_brace 
    self.core_id = self.open_brace + Suppress(self.word) + Suppress(Literal("#")) \ 
      + self.num + self.close_brace 
    self.ppm_id = self.open_brace + self.num + self.close_brace 
    self.oct_ts = self.open_brace + self.num + self.close_brace 
    self.dump = Suppress(Word(hexnums) + Literal(":")) + OneOrMore(Word(hexnums)) 
    self.opening = Suppress(self.date) + Optional(self.is_cavium.setResultsName("cavium")) \ 
      + self.oct_id.setResultsName("octeon").setParseAction(lambda toks:int(toks[0])) \ 
      + self.core_id.setResultsName("core").setParseAction(lambda toks:int(toks[0])) \ 
      + Optional(self.ppm_id.setResultsName("ppm").setParseAction(lambda toks:int(toks[0])) \ 
      + self.oct_ts.setResultsName("timestamp").setParseAction(lambda toks:int(toks[0]))) \ 
      + Optional(self.dump.setResultsName("pcap")) 

def parse_file(self, filepath): 
    self.filepath = filepath 
    with open(self.filepath,'r') as f: 
     self.lines = f.readlines() 
     for lineno,line in enumerate(self.lines): 
      self.in_list.append((lineno,line)) 
     processes = [mp.Process(target=self.parse_line) for i in range(mp.cpu_count())] 
     [process.start() for process in processes] 
     [process.join() for process in processes] 

    while self.in_list: 
     (lineno, len) = self.in_list.pop() 
     print mp.current_process().name, "start" 
     dic = defaultdict(int) 
     result = self.opening.parseString(line) 
     self.pcap_text.append("".join(result.pcap)) 
     if result.timestamp or result.ppm: 
      dic['oct'], dic['core'], dic['ppm'], dic['timestamp'] = result[0:4] 
      self.last_timestamp[result.octeon][result.core] = (result.ppm,result.timestamp) 
     else: 
      dic['oct'], dic['core'] = result[0:2] 
      dic['ppm'] = (self.last_timestamp[result.octeon][result.core])[0] 
      dic['ts'] = (self.last_timestamp[result.octeon][result.core])[1] 
     dic['line'] = lineno 
     self.out_list.append(dic) 

然而,这整个过程大约需要3分钟才能完成。

我的问题是,如果有更好的方法来提高速度?

我使用pyparsing模块来解析每一行,如果它有任何区别。

注:在日常保麦圭尔的建议

回答

2

不是一个大的性能问题所做的更改,但学会文件直接迭代,而不是使用readlines方法()。代替此代码:

self.lines = f.readlines() 
    for lineno,line in enumerate(self.lines): 
     self.in_list.append((lineno,line)) 

你可以写:

self.in_list = list(enumerate(f)) 

一个隐藏的性能杀手使用while self.in_list: (lineno,line) = list.pop()。每次调用弹出窗口都会从列表中删除第0个元素。不幸的是,Python的列表是作为数组实现的。要移除第0个元素,第1..n-1个元素必须在阵列中向上移动一个位置。你不是真的摧毁self.in_list你去,只是遍历它:

for lineno, line in self.in_list: 
    <Do something with line and line no. Parse each line and push into out_list> 

如果您认为消费self.in_list因为你走的是节省内存的措施,那么就可以避免而是通过使用deque(Python提供的集合模块)来改变Python列表的数组移位效率。 deque的内部实现为链接列表,因此从任一端推送或弹出都非常快,但索引访问速度很慢。要使用双端队列,更换线路:

self.in_list = list(enumerate(f)) 

有:

self.in_list = deque(enumerate(f)) 

然后用self.in_list.popleft()调用替换在你的代码self.in_list.pop()

但更可能是性能问题是你用来处理每一行的pyparsing代码。但是由于您没有发布解析器代码,因此我们可以在那里提供很多帮助。

要想知道时间在哪里,请尝试离开您的所有代码,然后注释<Do something with line and line no. Parse each line and push into out_list>代码(您可能必须为for循环添加pass语句),然后针对您的23MB文件运行。这会给你一个粗略的想法,你有多少时间花在阅读和迭代文件上,以及花费了多少时间来进行实际的解析。然后,当你发现真正的性能问题在哪里时,回到另一个问题。

+0

我使用list的原因是我正在使用multiprocessing manager模块将self.in_list作为代理列表进行初始化。我需要弹出这些元素,因为多个进程正在访问这个列表并从中读取一个元素。我已经用pyparsing部分更新了代码。我意识到这是最大的瓶颈,但我希望在使用多处理时逐行读取更好更高效的方法。 – nitimalh 2015-03-25 03:27:29