我想用下面的代码解析一个巨大的文件(大约23 MB),其中我使用从文件中读取的所有行填充multiprocessing.manager.list。在每个进程的目标例程(parse_line)中,我弹出一行并解析它以创建具有特定解析属性的defaultdict对象,最后将这些对象中的每一个都推送到另一个multiprocessing.manager.list中。用于解析大文件的多处理和共享多处理管理器列表
class parser(object):
def __init__(self):
self.manager = mp.Manager()
self.in_list = self.manager.list()
self.out_list = self.manager.list()
self.dict_list,self.lines, self.pcap_text = [],[],[]
self.last_timestamp = [[(999999,0)]*32]*2
self.num = Word(nums)
self.word = Word(alphas)
self.open_brace = Suppress(Literal("["))
self.close_brace = Suppress(Literal("]"))
self.colon = Literal(":")
self.stime = Combine(OneOrMore(self.num + self.colon) + self.num + Literal(".") + self.num)
self.date = OneOrMore(self.word) + self.num + self.stime
self.is_cavium = self.open_brace + (Suppress(self.word)) + self.close_brace
self.oct_id = self.open_brace + Suppress(self.word) + Suppress(Literal("=")) \
+ self.num + self.close_brace
self.core_id = self.open_brace + Suppress(self.word) + Suppress(Literal("#")) \
+ self.num + self.close_brace
self.ppm_id = self.open_brace + self.num + self.close_brace
self.oct_ts = self.open_brace + self.num + self.close_brace
self.dump = Suppress(Word(hexnums) + Literal(":")) + OneOrMore(Word(hexnums))
self.opening = Suppress(self.date) + Optional(self.is_cavium.setResultsName("cavium")) \
+ self.oct_id.setResultsName("octeon").setParseAction(lambda toks:int(toks[0])) \
+ self.core_id.setResultsName("core").setParseAction(lambda toks:int(toks[0])) \
+ Optional(self.ppm_id.setResultsName("ppm").setParseAction(lambda toks:int(toks[0])) \
+ self.oct_ts.setResultsName("timestamp").setParseAction(lambda toks:int(toks[0]))) \
+ Optional(self.dump.setResultsName("pcap"))
def parse_file(self, filepath):
self.filepath = filepath
with open(self.filepath,'r') as f:
self.lines = f.readlines()
for lineno,line in enumerate(self.lines):
self.in_list.append((lineno,line))
processes = [mp.Process(target=self.parse_line) for i in range(mp.cpu_count())]
[process.start() for process in processes]
[process.join() for process in processes]
while self.in_list:
(lineno, len) = self.in_list.pop()
print mp.current_process().name, "start"
dic = defaultdict(int)
result = self.opening.parseString(line)
self.pcap_text.append("".join(result.pcap))
if result.timestamp or result.ppm:
dic['oct'], dic['core'], dic['ppm'], dic['timestamp'] = result[0:4]
self.last_timestamp[result.octeon][result.core] = (result.ppm,result.timestamp)
else:
dic['oct'], dic['core'] = result[0:2]
dic['ppm'] = (self.last_timestamp[result.octeon][result.core])[0]
dic['ts'] = (self.last_timestamp[result.octeon][result.core])[1]
dic['line'] = lineno
self.out_list.append(dic)
然而,这整个过程大约需要3分钟才能完成。
我的问题是,如果有更好的方法来提高速度?
我使用pyparsing模块来解析每一行,如果它有任何区别。
注:在日常保麦圭尔的建议
我使用list的原因是我正在使用multiprocessing manager模块将self.in_list作为代理列表进行初始化。我需要弹出这些元素,因为多个进程正在访问这个列表并从中读取一个元素。我已经用pyparsing部分更新了代码。我意识到这是最大的瓶颈,但我希望在使用多处理时逐行读取更好更高效的方法。 – nitimalh 2015-03-25 03:27:29