2017-06-30 16 views
1

我正在开发一个工具来刮取推文并对其进行处理,以便用户建立词频分析。由于需要处理的数据量很大,我将文字处理部分从推文刮取部分分离。是否有一种直观的方式来循环多处理.Connection对象直到达到EOF?

multiprocessing.Connectionrec, sen = multiprocessing.Pipe(False))提供了在进程间传输数据的有用工具。但是,当发送端显式调用Connection.close()时,我无法找到接收端可能会告诉EOF何时到达的实用程序。我试过:

def yielder(conn): 
    yield conn.recv() 

但是,这只是在返回管道中的第一个项目后停止。 目前与我绕过该问题的尝试,除了一个while True循环中的语句:

try: 
    status = rec.recv() 
    ...process data... 
except BrokenPipeError: 
    break 

我也看到这个可以通过发送一个特定的结束标志,让当它接收到接收端终止该进程完成。但是这些都是违反直觉和丑陋的做法,违反Python的禅宗:

美丽比丑陋更好。

...

应该有one--和最好只有一个--obvious办法做到这一点。

我错过了什么吗?有没有简单,优雅的方式,如C++的

while getline(istreamobject, line) 

来执行我的任务?

回答

0

您可以使用第二种形式的调用iteriter(callable, sentinel) -> iterator将其设置为for循环。尽管如此,你仍然必须抓住例外。

try: 
    for status in iter(conn.recv, None): 
     ... 
except BrokenPipeError: 
    pass 

如果不是关闭的管道,你发送一个“EOF”下的管道,你可以删除try/except并做for status in iter(conn.recv, 'EOF message'),当'EOF message'被收到(可以是任何东西),iter停止的循环。通常情况下,EOF消息是一个空字符串,所以经常可以看到喜欢的东西:

for line in iter(file.read, ''): 
    ... 

itertools recipes有这个功能称为iter_except。这基本上是你想用yielder功能

def iter_except(func, exception, first=None): 
    """ Call a function repeatedly until an exception is raised. 

    Converts a call-until-exception interface to an iterator interface. 
    Like builtins.iter(func, sentinel) but uses an exception instead 
    of a sentinel to end the loop. 

    Examples: 
     iter_except(functools.partial(heappop, h), IndexError) # priority queue iterator 
     iter_except(d.popitem, KeyError)       # non-blocking dict iterator 
     iter_except(d.popleft, IndexError)      # non-blocking deque iterator 
     iter_except(q.get_nowait, Queue.Empty)     # loop over a producer Queue 
     iter_except(s.pop, KeyError)        # non-blocking set iterator 

    """ 
    try: 
     if first is not None: 
      yield first()   # For database APIs needing an initial cast to db.first() 
     while True: 
      yield func() 
    except exception: 
     pass 

所以之前做什么,你也可以做这样的事情:

for status in iter_except(conn.recv, BrokenPipeError): 
    ... 

或者刚刚杀青的yielder功能:

def yielder(conn): 
    try: 
     while True: 
      yield conn.recv() 
    except BrokenPipeError: 
     pass 

for status in yielder(conn): 
    ... 
+0

yielder函数不会抛出任何异常。它只是产生管道中的第一个值然后停止。 – flymousechiu

+0

@flymousechiu它仍然只产生没有'while True'循环的第一个值吗? – Artyer

+0

是的,如果你尝试下面的话,就可以明显看出来:'import multiprocessing as mp','def yielder(conn):yield conn.recv()','rec,sen = mp.Pipe(False)','for i in范围(10):sen.send(i)','list(yielder(rec))'。应该返回[0] – flymousechiu