2009-12-11 105 views

回答

59

,我建议您在开始线程之前实例化一个Queue.Queue,并且把它作为线程的args来一个:线程完成之前,它.put S中它作为参数接收队列结果。父母可以随意.get.get_nowait

队列一般都安排在Python线程同步和通信的最佳途径:他们本质上是线程安全的,消息传递车辆 - 组织一般多任务的最佳途径 - )

+2

'线程完成之前,它.puts它作为一个argument'接收队列中的结果,你的意思是,这将自动蟒蛇做什么?如果不是(意思是设计小费),那么你能否在答案中说清楚。 – n611x007 2013-11-11 12:21:21

+3

为了专业化现有的功能,它很难看。队列对于单个结果问题有很多不必要的开销。更清楚而有效的子类'threading.Thread'和新的run()方法只是简单地将结果存储为像'self.ret = ...'这样的属性(更为舒服的是Thread的子类,它处理返回值/异常自定义的目标函数确实'threading.Thread'应该被扩展以提供开箱即用 - 因为它将与旧的行为“return None”兼容。) – kxr 2016-08-17 10:38:01

1

那么,在Python线程模块中,存在与锁关联的条件对象。一种方法acquire()将返回从底层方法返回的任何值。欲了解更多信息:Python Condition Objects

5

另一种方法!是将回调函数传递给线程。这提供了一种简单,安全和灵活的方法,可以随时从新线程向父节点返回一个值。

# A sample implementation 

import threading 
import time 

class MyThread(threading.Thread): 
    def __init__(self, cb): 
     threading.Thread.__init__(self) 
     self.callback = cb 

    def run(self): 
     for i in range(10): 
      self.callback(i) 
      time.sleep(1) 


# test 

import sys 

def count(x): 
    print x 
    sys.stdout.flush() 

t = MyThread(count) 
t.start() 
+8

问题在于回调仍然在子线程,而不是原始线程。 – babbageclunk 2009-12-11 12:58:43

+0

@wilberforce你能解释一下它可能导致什么问题吗? – 2009-12-11 13:50:12

+4

好的。一个例子是,如果回调函数写入父线程在线程运行时写入的日志文件。由于回调在子线程中运行,因此存在两次写入同时发生并发生冲突的风险 - 如果日志记录框架做了一些内部簿记,则可能会出现乱码或交错输出,或者崩溃。使用一个线程安全的队列并且有一个线程可以避免这种情况。这些问题可能是讨厌的,因为它们不是确定性的 - 它们可能只在生产中出现,并且可能难以再现。 – babbageclunk 2009-12-11 14:39:43

12

如果你调用join()等待线程完成,你可以简单地附上结果线程实例本身,然后从join()方法返回后,主线程检索。

另一方面,您不告诉我们您是如何打算发现线程已完成并且结果可用。如果您已经有了这样做的方法,那么可能会指出您(如果您要告诉我们,请告诉我们)以获得结果的最佳方式。

+0

*您可以简单地将结果附加到Thread实例本身*如何将Thread实例传递到它运行的目标,以便目标可以将结果附加到此实例? – 2012-11-26 21:48:57

+1

Piotr Dobrogost,如果您不是为您的实例创建Thread的子类,那么您可以在目标可调用的末尾使用threading.current_thread()。我会说这有点丑,但亚历克斯的方法总是更优雅。在某些情况下,这种方法更为方便。 – 2012-11-28 16:41:14

+3

如果'join()'只会返回被调用的方法返回的内容,那就好了......看起来很愚蠢,而是返回'None'。 – ArtOfWarfare 2014-12-10 15:51:42

10

您应该传递一个Queue实例作为参数,然后您应该将.put()返回对象放入队列中。你可以通过queue.get()收集返回值,无论你放置什么对象。

样品:

queue = Queue.Queue() 
thread_ = threading.Thread(
       target=target_method, 
       name="Thread1", 
       args=[params, queue], 
       ) 
thread_.start() 
thread_.join() 
queue.get() 

def target_method(self, params, queue): 
""" 
Some operations right here 
""" 
your_return = "Whatever your object is" 
queue.put(your_return) 

使用多线程:

#Start all threads in thread pool 
    for thread in pool: 
     thread.start() 
     response = queue.get() 
     thread_results.append(response) 

#Kill all threads 
    for thread in pool: 
     thread.join() 

我用这个实现,它为我的伟大工程。我希望你这样做。

+1

你不是想念你的thread_.start()? – sadmicrowave 2013-09-13 02:48:05

+1

当然,我开始线我只想念把这一行在这里:)感谢您的通知。 – 2013-09-13 08:50:33

+0

如果你有多个线程,这将如何? que.get()只为我返回一个线程的结果? – ABros 2014-04-16 11:05:09

6

使用拉姆达来包装你的目标线程的功能和使用队列通过其返回值回父线程。 (您原来的目标函数仍然没有额外的队列参数不变。)

示例代码:

import threading 
import queue 
def dosomething(param): 
    return param * 2 
que = queue.Queue() 
thr = threading.Thread(target = lambda q, arg : q.put(dosomething(arg)), args = (que, 2)) 
thr.start() 
thr.join() 
while not que.empty(): 
    print(que.get()) 

输出:

4 
2

POC:

import random 
import threading 

class myThread(threading.Thread): 
    def __init__(self, arr): 
     threading.Thread.__init__(self) 
     self.arr = arr 
     self.ret = None 

    def run(self): 
     self.myJob(self.arr) 

    def join(self): 
     threading.Thread.join(self) 
     return self.ret 

    def myJob(self, arr): 
     self.ret = sorted(self.arr) 
     return 

#Call the main method if run from the command line. 
if __name__ == '__main__': 
    N = 100 

    arr = [ random.randint(0, 100) for x in range(N) ] 
    th = myThread(arr) 
    th.start() 
    sortedArr = th.join() 

    print "arr2: ", sortedArr 
3

您可以使用同步queue模块。
考虑您需要检查从数据库中的用户的相关信息与已知ID:

def check_infos(user_id, queue): 
    result = send_data(user_id) 
    queue.put(result) 

现在你可以让你的数据是这样的:

import queue, threading 
queued_request = queue.Queue() 
check_infos_thread = threading.Thread(target=check_infos, args=(user_id, queued_request)) 
check_infos_thread.start() 
final_result = queued_request.get() 
6

我很惊讶没有人提到,你可以只通过它可变:

>>> thread_return={'success': False} 
>>> from threading import Thread 
>>> def task(thread_return): 
... thread_return['success'] = True 
... 
>>> Thread(target=task, args=(thread_return,)).start() 
>>> thread_return 
{'success': True} 

也许这有我不知道的重大问题。

+1

这完美的作品!如果有的话,我们真的很想听听有关这种方法遗漏的事情的一些看法。 – 2016-06-16 15:07:30

+0

工程。就其专业化现有功能而言,其丑陋 - 以及那些令人困惑的事情(可读性) - 请参阅对第一个答案的评论。 – kxr 2016-08-17 10:44:09

+0

用多线程怎么样? – backslash112 2016-08-17 16:29:30

0

以下包装函数将包装现有函数并返回一个指向线程的对象(以便您可以调用start(),join()等)以及访问/查看其最终返回值。

def threadwrap(func,args,kwargs): 
    class res(object): result=None 
    def inner(*args,**kwargs): 
    res.result=func(*args,**kwargs) 
    import threading 
    t = threading.Thread(target=inner,args=args,kwargs=kwargs) 
    res.thread=t 
    return res 

def myFun(v,debug=False): 
    import time 
    if debug: print "Debug mode ON" 
    time.sleep(5) 
    return v*2 

x=threadwrap(myFun,[11],{"debug":True}) 
x.thread.start() 
x.thread.join() 
print x.result 

它看起来OK,和threading.Thread类似乎很容易扩展(*)具有这种功能,所以我不知道为什么它已不存在。上述方法有缺陷吗? (*)请注意,husanu对这个问题的回答完全是这样的,子类threading.Thread产生了一个版本,join()给出了返回值。

1

基于jcomeau_ictx的建议。我遇到的最简单的一个。这里的要求是从服务器上运行的三个不同进程中获得退出状态staus,并在三个都成功时触发另一个脚本。这似乎是工作的罚款

class myThread(threading.Thread): 
     def __init__(self,threadID,pipePath,resDict): 
      threading.Thread.__init__(self) 
      self.threadID=threadID 
      self.pipePath=pipePath 
      self.resDict=resDict 

     def run(self): 
      print "Starting thread %s " % (self.threadID) 
      if not os.path.exists(self.pipePath): 
      os.mkfifo(self.pipePath) 
      pipe_fd = os.open(self.pipePath, os.O_RDWR | os.O_NONBLOCK) 
      with os.fdopen(pipe_fd) as pipe: 
       while True: 
        try: 
        message = pipe.read() 
        if message: 
         print "Received: '%s'" % message 
         self.resDict['success']=message 
         break 
        except: 
         pass 

    tResSer={'success':'0'} 
    tResWeb={'success':'0'} 
    tResUisvc={'success':'0'} 


    threads = [] 

    pipePathSer='/tmp/path1' 
    pipePathWeb='/tmp/path2' 
    pipePathUisvc='/tmp/path3' 

    th1=myThread(1,pipePathSer,tResSer) 
    th2=myThread(2,pipePathWeb,tResWeb) 
    th3=myThread(3,pipePathUisvc,tResUisvc) 

    th1.start() 
    th2.start() 
    th3.start() 

    threads.append(th1) 
    threads.append(th2) 
    threads.append(th3) 

    for t in threads: 
     print t.join() 

    print "Res: tResSer %s tResWeb %s tResUisvc %s" % (tResSer,tResWeb,tResUisvc) 
    # The above statement prints updated values which can then be further processed