我有一个多线程程序,我创建一个生成器函数,然后将其传递给新线程。我希望它在本质上是共享/全局的,因此每个线程都可以从生成器获取下一个值。发电机线程安全吗?
使用像这样的生成器是否安全,还是会遇到从多个线程访问共享生成器的问题/条件?
如果没有,是否有更好的方法来解决这个问题?我需要一些能够循环访问列表并为任何线程调用它的下一个值。
我有一个多线程程序,我创建一个生成器函数,然后将其传递给新线程。我希望它在本质上是共享/全局的,因此每个线程都可以从生成器获取下一个值。发电机线程安全吗?
使用像这样的生成器是否安全,还是会遇到从多个线程访问共享生成器的问题/条件?
如果没有,是否有更好的方法来解决这个问题?我需要一些能够循环访问列表并为任何线程调用它的下一个值。
它不是线程安全的;同时通话可能会交错,并与局部变量混淆。
常用的方法是使用主从模式(现在称PC中的农民工模式)。创建第三个线程来生成数据,并在主服务器和从服务器之间添加一个队列,从服务器将从队列中读取数据,并且主服务器将向其写入数据。标准队列模块提供必要的线程安全性,并安排阻塞主机,直到从机准备好读取更多数据。
不,它们不是线程安全的。你可以找到关于发电机和多线程的有趣的信息:
这取决于你使用的Python实现。在CPython中,GIL对python对象进行线程安全的所有操作,因为在任何给定的时间只有一个线程可以执行代码。
“GIL对python对象进行线程安全操作” - 嗯?所有的操作都不是原子的 – 2009-07-15 18:16:50
这是很危险的误导。 GIL只意味着Python代码不会在多线程环境中破坏Python状态:不能在字节码操作中改变线程。 (例如,您可以修改共享字典而不会破坏它。)您仍然可以在任何两个字节码操作符之间更改线程。 – 2009-07-15 19:45:56
编辑以低于基准添加。
你可以用一个锁包裹发电机。例如,
import threading
class LockedIterator(object):
def __init__(self, it):
self.lock = threading.Lock()
self.it = it.__iter__()
def __iter__(self): return self
def next(self):
self.lock.acquire()
try:
return self.it.next()
finally:
self.lock.release()
gen = [x*2 for x in [1,2,3,4]]
g2 = LockedIterator(gen)
print list(g2)
锁定需要我的系统上50毫秒,队列中取出350毫秒。当你真的有一个队列时,队列是有用的;例如,如果您有传入的HTTP请求,并且您想将它们排队以供工作线程处理。 (这不适合Python迭代器模型 - 一旦迭代器用完了项目,就完成了。)如果你确实有一个迭代器,那么LockedIterator是使线程安全的更快,更简单的方法。
from datetime import datetime
import threading
num_worker_threads = 4
class LockedIterator(object):
def __init__(self, it):
self.lock = threading.Lock()
self.it = it.__iter__()
def __iter__(self): return self
def next(self):
self.lock.acquire()
try:
return self.it.next()
finally:
self.lock.release()
def test_locked(it):
it = LockedIterator(it)
def worker():
try:
for i in it:
pass
except Exception, e:
print e
raise
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
for t in threads:
t.join()
def test_queue(it):
from Queue import Queue
def worker():
try:
while True:
item = q.get()
q.task_done()
except Exception, e:
print e
raise
q = Queue()
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.setDaemon(True)
t.start()
t1 = datetime.now()
for item in it:
q.put(item)
q.join()
start_time = datetime.now()
it = [x*2 for x in range(1,10000)]
test_locked(it)
#test_queue(it)
end_time = datetime.now()
took = end_time-start_time
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000)
Queue.Queue肯定为+1,适用于组织线程系统(这是大多数情况下,绝对是此任务)的好方法。 – 2009-07-15 13:46:18