2012-03-19 246 views
0

作为this question的后续行动,我试图通过使用生成器xrange(int(1e8))来规避以range(int(1e8))为例的列表构建。 xrange只是一个产生长序列值的过程的例子。 (请认为它不容易被复制。)更多的背景是,我有一长串的时间戳/值对,我想对它们做一些处理(时间序列)。我尽量避免将这些内容拉到整个内存中,因为这是很多数据。Python生成器的多个客户端?

我认为这很酷,如果我可以将多个处理单元同时应用到由我的生成器生成的这个数据流中。第一个想法是使用itertools.tee(),例如:

from itertools import tee 
g1,g2 = tee(xrange(int(1e8)),2) 
sum(g1), sum(g2) 

但后来我发现,只有第一sum()将使用发电机,而tee()内部再建立一个list(我想避免的。)。

所以我想,我需要一个异步解决方案,即每个sum()每个生成器步骤都会进行更新。 是来记东西的地方

但我既没有真正使用之前,部分我甚至不能告诉方法是否可以正常工作,或者是有效的/有效的/高效的。

从这一点,我很乐意欣赏观众的任何建议!


更新

我想避免callback based solution,因为它apparantly降低性能显著(这是它是如何目前实施)。我加入了以下一些剖析(请加注释,如果测试是不客观的):

class SinkA: 
    def __init__(self, src): 
    for i in src: pass 

class SinkB: 
    def f(self,i): 
    pass 

class Source: 
    def __iter__(self): 
    for i in xrange(int(1e4)): 
     yield i 

def t1(): 
    src = Source() 
    snk = SinkA(src) 

def t2(): 
    src = Source() 
    snk = SinkB() 
    for i in src: snk.f(i) 

if __name__ == "__main__": 
    from timeit import Timer 
    n = 1000 
    t = Timer("t1()", "from __main__ import t1, t2, SinkA, SinkB, Source") 
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 612.11 usec/pass 
    t = Timer("t2()", "from __main__ import t1, t2, SinkA, SinkB, Source") 
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 1933.39 usec/pass 

更新2

我还能说什么?我有这个基于回调的解决方案,看起来效率不高。基于生成器的方法似乎很有前途,但我对这种编程经验太少,特别是当涉及更复杂的协同程序或扭曲的库时。总而言之,我有多个消费者用于生成大量数据的过程,并且我发现了一些潜在的方法。现在我正在寻找有经验的用户可能已经完成类似任务的合格语句。说明哪种方法可能是适当的,以及这些方法如何相互关联。或者我可能错过了其他的方法。

+1

你真的不解决这个问题:你希望每个消费者看到完全相同的数据,或没有? – Marcin 2012-03-19 12:17:27

+0

我猜你用'tee'看到的行为是因为你没有并行运行你的两个任务。 Python首先执行'sum(g1)',然后'sum(g2)'。尝试使用循环手动执行你的总和,并查看它是否消耗为可读内存。 – 2012-03-19 12:25:49

+0

@CharlesBrunet,那是真的。我想以某种方式抽象掉这个手动循环。要有更好的代码。 – moooeeeep 2012-03-19 12:37:26

回答

5

作为一个通用的方法,我会回调更换发电机的拉模型,和可能,包裹发电机,就像这样:

def walk(gen, callbacks): 
    for item in gen: 
     for f in callbacks: 
      f(item) 

如果你的处理器是在单独的线程,你想他们阻止等待,您可以注册Queue.put(或任何等效的)作为每个处理器的回调,并独立轮询这些队列。如果您需要,这将允许您使用广播和工作者模型。

编辑

另一种解决方案是使用coroutines

def source(self, *dests): 
    for i in xrange(int(1e4)): 
     for dest in dests: 
      dest.send(i) 

def sink(): 
    while True: 
     i = yield 

def t3(): 
    snk = sink() 
    snk.next() # activate the coroutine 
    source(snk) 

if __name__ == '__main__': 

    from timeit import Timer 
    n = 1000 
    t = Timer("t3()", "from __main__ import source, sink, t3") 
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 872.99 usec/pass 

看起来不够快。基本上,协程是倒置的发生器,你从发生器中拉出,推入协程。

+0

这是什么架构?考虑到拉模型会导致线程阻塞,只要IO层能够正常工作,那将是最简单的编程模型。 – Marcin 2012-03-19 12:28:56

+0

如果该人需要阻止,那么实际上回调可能是'Queue.put'(更新了答案)。 – bereal 2012-03-19 12:31:14

+0

请参阅我的编辑 – moooeeeep 2012-03-19 12:35:19

1

你并没有真正解决这个问题,但是你希望每个消费者看到完全相同的数据(在这种情况下,tee可能是最好的解决方案)吗?

如果不是,那么您可以简单地让每个消费者从一个生成器对象读取。

如果您确实希望他们获得完全相同的数据,请尝试tee(使用更多内存)与两个生成器(更多IO),并查看哪个更快。

至于你的计时,你的数据显示的只是多个函数调用的开销,并且你的一个方法避免了中间函数调用。

如果你想提高性能,请尝试在PyPy上运行它,它有一个热点优化JIT。

+0

不幸的是PyPy不支持我的其他依赖项。 (事实上​​,问题中没有记录。) – moooeeeep 2012-03-19 13:39:26

+0

@moooeeeep你似乎有很多约束,你根本没有声明。所有这些答案都会按照你的问题回答你的问题,但是你一直抱怨他们不回答你的问题。 – Marcin 2012-03-19 13:44:08

1

由于发电机在内存中便宜,为什么不简单地使用两个独立的发电机?

g1 = xrange(int(1e8)) 
g2 = xrange(int(1e8)) 
sum(g1), sum(g2) 
+0

两次IO,但。 – Marcin 2012-03-19 12:18:00

+0

'xrange'只是一个产生一长串值的过程的例子。请认为它不能轻易复制。 – moooeeeep 2012-03-19 12:20:09