2009-07-15 51 views
34

我有一个多线程程序,我创建一个生成器函数,然后将其传递给新线程。我希望它在本质上是共享/全局的,因此每个线程都可以从生成器获取下一个值。发电机线程安全吗?

使用像这样的生成器是否安全,还是会遇到从多个线程访问共享生成器的问题/条件?

如果没有,是否有更好的方法来解决这个问题?我需要一些能够循环访问列表并为任何线程调用它的下一个值。

回答

49

它不是线程安全的;同时通话可能会交错,并与局部变量混淆。

常用的方法是使用主从模式(现在称PC中的农民工模式)。创建第三个线程来生成数据,并在主服务器和从服务器之间添加一个队列,从服务器将从队列中读取数据,并且主服务器将向其写入数据。标准队列模块提供必要的线程安全性,并安排阻塞主机,直到从机准备好读取更多数据。

+7

Queue.Queue肯定为+1,适用于组织线程系统(这是大多数情况下,绝对是此任务)的好方法。 – 2009-07-15 13:46:18

-7

这取决于你使用的Python实现。在CPython中,GIL对python对象进行线程安全的所有操作,因为在任何给定的时间只有一个线程可以执行代码。

http://en.wikipedia.org/wiki/Global_Interpreter_Lock

+1

“GIL对python对象进行线程安全操作” - 嗯?所有的操作都不是原子的 – 2009-07-15 18:16:50

+6

这是很危险的误导。 GIL只意味着Python代码不会在多线程环境中破坏Python状态:不能在字节码操作中改变线程。 (例如,您可以修改共享字典而不会破坏它。)您仍然可以在任何两个字节码操作符之间更改线程。 – 2009-07-15 19:45:56

40

编辑以低于基准添加。

你可以用一个锁包裹发电机。例如,

import threading 
class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

gen = [x*2 for x in [1,2,3,4]] 
g2 = LockedIterator(gen) 
print list(g2) 

锁定需要我的系统上50毫秒,队列中取出350毫秒。当你真的有一个队列时,队列是有用的;例如,如果您有传入的HTTP请求,并且您想将它们排队以供工作线程处理。 (这不适合Python迭代器模型 - 一旦迭代器用完了项目,就完成了。)如果你确实有一个迭代器,那么LockedIterator是使线程安全的更快,更简单的方法。

from datetime import datetime 
import threading 
num_worker_threads = 4 

class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

def test_locked(it): 
    it = LockedIterator(it) 
    def worker(): 
     try: 
      for i in it: 
       pass 
     except Exception, e: 
      print e 
      raise 

    threads = [] 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     threads.append(t) 
     t.start() 

    for t in threads: 
     t.join() 

def test_queue(it): 
    from Queue import Queue 
    def worker(): 
     try: 
      while True: 
       item = q.get() 
       q.task_done() 
     except Exception, e: 
      print e 
      raise 

    q = Queue() 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     t.setDaemon(True) 
     t.start() 

    t1 = datetime.now() 

    for item in it: 
     q.put(item) 

    q.join() 

start_time = datetime.now() 
it = [x*2 for x in range(1,10000)] 

test_locked(it) 
#test_queue(it) 
end_time = datetime.now() 
took = end_time-start_time 
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000)