2010-11-01 93 views
3

对于预期会阻塞(并且不能轻易修改为使用像Tornado的异步HTTP请求客户端之类的东西)的Tornado服务器中的操作,我已经将工作卸载到使用multiprocessing模块分离工作进程。具体来说,我使用的是多处理器Pool,因为它提供了一种称为apply_async的方法,它与Tornado非常协调,因为它将回调作为其参数之一。Errno 9在Python中使用带Tornado的多处理模块

我最近意识到池预先分配进程的数量,所以如果它们全部变为阻塞,那么需要新进程的操作将不得不等待。我意识到服务器仍然可以连接,因为apply_async通过将任务添加到任务队列中工作,并且本身立即完成,但我期待产生n进程n我需要的阻塞任务的数量去表演。

我想我可以使用add_handler方法为我的Tornado服务器的IOLoop添加一个处理程序,为每个新创建的IOLoop创建一个PID。我以前做过类似的事情,但是它使用了popen和一个任意的命令。这种方法的一个例子是here。不过,我想将参数传递给我的范围内的任意目标Python函数,所以我想坚持使用multiprocessing

但是,似乎有些东西不喜欢我的multiprocessing.Process对象具有的PID。我得到IOError: [Errno 9] Bad file descriptor。这些过程是否受到某种限制?我知道PID在我实际启动过程之前是不可用的,但我做了启动过程。下面是我做了演示此问题的示例的源代码:

#!/usr/bin/env python 

"""Creates a small Tornado program to demonstrate asynchronous programming. 
Specifically, this demonstrates using the multiprocessing module.""" 

import tornado.httpserver 
import tornado.ioloop 
import tornado.web 
import multiprocessing as mp 
import random 
import time 

__author__ = 'Brian McFadden' 
__email__ = '[email protected]' 

def sleepy(queue): 
    """Pushes a string to the queue after sleeping for 5 seconds. 
    This sleeping can be thought of as a blocking operation.""" 

    time.sleep(5) 
    queue.put("Now I'm awake.") 
    return 

def random_num(): 
    """Returns a string containing a random number. 
    This function can be used by handlers to receive text for writing which 
    facilitates noticing change on the webpage when it is refreshed.""" 

    n = random.random() 
    return "<br />Here is a random number to show change: {0}".format(n) 

class SyncHandler(tornado.web.RequestHandler): 
    """Demonstrates handing a request synchronously. 
    It executes sleepy() before writing some more text and a random number to 
    the webpage. While the process is sleeping, the Tornado server cannot 
    handle any requests at all.""" 

    def get(self): 
     q = mp.Queue() 
     sleepy(q) 
     val = q.get() 
     self.write(val) 
     self.write('<br />Brought to you by SyncHandler.') 
     self.write('<br />Try refreshing me and then the main page.') 
     self.write(random_num()) 

class AsyncHandler(tornado.web.RequestHandler): 
    """Demonstrates handing a request asynchronously. 
    It executes sleepy() before writing some more text and a random number to 
    the webpage. It passes the sleeping function off to another process using 
    the multiprocessing module in order to handle more requests concurrently to 
    the sleeping, which is like a blocking operation.""" 

    @tornado.web.asynchronous 
    def get(self): 
     """Handles the original GET request (normal function delegation). 
     Instead of directly invoking sleepy(), it passes a reference to the 
     function to the multiprocessing pool.""" 

     # Create an interprocess data structure, a queue. 
     q = mp.Queue() 
     # Create a process for the sleepy function. Provide the queue. 
     p = mp.Process(target=sleepy, args=(q,)) 
     # Start it, but don't use p.join(); that would block us. 
     p.start() 
     # Add our callback function to the IOLoop. The async_callback wrapper 
     # makes sure that Tornado sends an HTTP 500 error to the client if an 
     # uncaught exception occurs in the callback. 
     iol = tornado.ioloop.IOLoop.instance() 
     print "p.pid:", p.pid 
     iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ) 

    def _finish(self, q): 
     """This is the callback for post-sleepy() request handling. 
     Operation of this function occurs in the original process.""" 

     val = q.get() 
     self.write(val) 
     self.write('<br />Brought to you by AsyncHandler.') 
     self.write('<br />Try refreshing me and then the main page.') 
     self.write(random_num()) 
     # Asynchronous handling must be manually finished. 
     self.finish() 

class MainHandler(tornado.web.RequestHandler): 
    """Returns a string and a random number. 
    Try to access this page in one window immediately after (<5 seconds of) 
    accessing /async or /sync in another window to see the difference between 
    them. Asynchronously performing the sleepy() function won't make the client 
    wait for data from this handler, but synchronously doing so will!""" 

    def get(self): 
     self.write('This is just responding to a simple request.') 
     self.write('<br />Try refreshing me after one of the other pages.') 
     self.write(random_num()) 

if __name__ == '__main__': 
    # Create an application using the above handlers. 
    application = tornado.web.Application([ 
     (r"/", MainHandler), 
     (r"/sync", SyncHandler), 
     (r"/async", AsyncHandler), 
    ]) 
    # Create a single-process Tornado server from the application. 
    http_server = tornado.httpserver.HTTPServer(application) 
    http_server.listen(8888) 
    print 'The HTTP server is listening on port 8888.' 
    tornado.ioloop.IOLoop.instance().start() 

这里是回溯:

Traceback (most recent call last): 
    File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 810, in _stack_context 
    yield 
    File "/usr/local/lib/python2.6/dist-packages/tornado/stack_context.py", line 77, in StackContext 
    yield 
    File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 827, in _execute 
    getattr(self, self.request.method.lower())(*args, **kwargs) 
    File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 909, in wrapper 
    return method(self, *args, **kwargs) 
    File "./process_async.py", line 73, in get 
    iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ) 
    File "/usr/local/lib/python2.6/dist-packages/tornado/ioloop.py", line 151, in add_handler 
    self._impl.register(fd, events | self.ERROR) 
IOError: [Errno 9] Bad file descriptor 

上面的代码实际上是从使用进程池旧的范例修改。我曾经为我的同事和我自己保存了一段时间的参考资料(因此有很多评论)。我以这样的方式构建它,以便我可以并排打开两个小型浏览器窗口向我的上司展示/ sync URI阻止连接,而/ async允许更多连接。为了这个问题的目的,你所需要做的就是重新尝试访问/ async处理程序。它立即出错。

该怎么办? PID如何“坏”?如果你运行该程序,你可以看到它被打印到标准输出。

为了记录,我在Ubuntu 10.04上使用Python 2.6.5。龙卷风是1.1。

+0

我知道对这个主题没有太多兴趣,但对于未来的绊脚石:正如Tornado邮件列表中的某位人员指出的那样,PID不等同于其管道的文件描述符。我找不到一种以'multiprocessing.Process'对象友好的编程方式访问stdout的方法,因此我使用了'multiprocessing.Pipe'对象,并为IOLoop和一个FD提供了一个FD作为参数。只要你需要打开管道(垃圾收集 - >腐败和崩溃),就要小心保持管道畅通无阻。 – Brian 2010-11-02 20:31:23

+2

也许你应该回答自己的问题,并发布一个工作代码示例?我会感激我的想法。 – oDDsKooL 2012-10-10 12:38:38

回答

2

add_handler需要一个有效的文件描述符,而不是一个PID。作为预期的一个例子,龙卷风本身通常通过传递一个套接字对象的fileno()来使用add_handler,该对象的文件描述符返回该对象的文件描述符。 PID在这种情况下是无关紧要的。

+0

任何建议,使此代码工作呢? – oDDsKooL 2012-10-10 12:09:16

+0

可能您需要添加类似于process.stdout.fileno的内容,以便您可以传递文件描述符编号而不是进程的PID,只是一个虽然,但并未尝试自己。 – securecurve 2012-12-23 16:43:19

相关问题