2011-10-24 44 views
21

我想使用Redis的发布 - 订阅传递一些信息,但不希望使用listen被阻塞,如下面的代码:是否可以阻止Redis pubsub?

import redis 
rc = redis.Redis() 

ps = rc.pubsub() 
ps.subscribe(['foo', 'bar']) 

rc.publish('foo', 'hello world') 

for item in ps.listen(): 
    if item['type'] == 'message': 
     print item['channel'] 
     print item['data'] 

最后for节将阻止。我只想检查一个给定的频道是否有数据,我该如何做到这一点?有没有类似check的方法?

+1

是不是有一个原因,你不想被阻止使用听? Redis连接相当便宜,通常会产生其中的几个。 –

+1

使用Redis,ZMQ,Tornado的Python中的异步PubSub - https://github.com/abhinavsingh/async_pubsub –

+1

使用pubsub对象的.get_message()方法代替.listen()(下面有一个示例)。 [发布此问题时,Python Redis驱动程序可能不支持此方法]。 –

回答

7

我不认为这是可能的。频道没有任何“当前数据”,您订阅频道并开始接收由频道上​​的其他客户端推送的消息,因此它是一个阻止API。另外,如果您查看Redis Commands documentation的pub/sub,则会更清楚。

+0

我认为这个答案与另一个相结合是相当完整的。他可以把它放到一个线程中。如果他不想在香奈儿有活动时采取即时行动,那么他可以将其存储在字典中,并拥有自己的检查方法,通过锁互斥锁查看字典 – jdi

1

要达到一个无阻塞的代码,你必须做另一种范例代码。这并不难,用一个新的线程来监听所有的变化,并让主线程去做另一件事情。

此外,您将需要一些机制来交换主线程和redis订阅线程之间的数据。

6

这是一个工作示例,用于对阻塞侦听器进行线程化。

import sys 
import cmd 
import redis 
import threading 


def monitor(): 
    r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0) 

    channel = sys.argv[1] 
    p = r.pubsub() 
    p.subscribe(channel) 

    print 'monitoring channel', channel 
    for m in p.listen(): 
     print m['data'] 


class my_cmd(cmd.Cmd): 
    """Simple command processor example.""" 

    def do_start(self, line): 
     my_thread.start() 

    def do_EOF(self, line): 
     return True 


if __name__ == '__main__': 
    if len(sys.argv) == 1: 
     print "missing argument! please provide the channel name." 
    else: 
     my_thread = threading.Thread(target=monitor) 
     my_thread.setDaemon(True) 

     my_cmd().cmdloop() 
+0

GIL是否现在进入图片?我们也许可以使用多处理(http://docs.python.org/2/library/multiprocessing.html)呢?但是这种方法会导致创建一个进程的开销 – Pramod

+0

GIL不会破坏队伍,因为你不是在CPU区域运行,而是网络监听。 – Andrew

0

您可以使用gevent,gevent monkey patching来构建一个非阻塞的redis pubsub应用程序。

42

如果您正在考虑非阻塞异步处理,您可能正在使用(或应该使用)异步框架/服务器。

+2

这是应该检查标记的正确答案。我不确定为什么人们会重新发明轮子,还有一个已经存在的用于redis的异步客户端,在这样的客户端存在时并不需要产生新的线程。 – securecurve

+0

@securcurve:公平起见,我在一个标记后的一年多的时间里添加了答案。然而,txRedis和brükva(来自Tornado-Redis分叉的)都是3岁,所以它不是一个真正的借口。 – vartec

+0

这个答案与问题无关。正如接受的答案中指出的那样,redis将消息推送给正在收听的客户端。因此没有办法要求消息。 – Glaslos

1

最有效的方法是基于greenlet的而不是基于线程的。作为基于greenlet的并发框架,gevent在Python世界中已经相当成熟。因此,与redis-py的gevent集成将会非常棒。这正是什么东西被在这个问题上github上讨论:

https://github.com/andymccurdy/redis-py/issues/310

0

的Redis'的pub/sub消息发送给订阅(听)的频道上的客户端。如果你没有听,你会错过信息(因此阻止呼叫)。如果你想让它成为非阻塞的,我建议使用一个队列来代替(redis也很好)。如果必须使用pub/sub,则可以使用建议的gevent以使异步阻塞侦听器将消息推送到队列,并使用单独的使用者以非阻塞的方式处理来自该队列的消息。

9

新版本的redispy支持异步pubsub,请查询https://github.com/andymccurdy/redis-py了解更多详情。 下面是从文档本身的例子:

while True: 
    message = p.get_message() 
    if message: 
     # do something with the message 
    time.sleep(0.001) # be nice to the system :) 
2

下面是没有线程非阻塞的解决方案:

fd = ps.connection._sock.fileno(); 
rlist,, = select.select([fd], [], [], 0) # or replace 0 with None to block 
if rlist: 
    for rfd in rlist: 
     if fd == rfd: 
      message = ps.get_message() 

ps.get_message()足以对自己,但我用这个方法,这样我可以等待多个而不仅仅是redis连接。

5

接受的答案已过时,因为redis-py建议您使用非阻塞的get_message()。但它也提供了一种轻松使用线程的方法。

https://pypi.python.org/pypi/redis

有阅读邮件三种不同的策略。

在幕后,get_message()使用系统的“select”模块快速轮询连接的套接字。如果有数据可供读取,get_message()将读取它,格式化消息并将其返回或传递给消息处理程序。如果没有要读取的数据,get_message()将立即返回None。这使得将它集成到应用程序中的现有事件循环中变得微不足道。

while True: 
    message = p.get_message() 
    if message: 
     # do something with the message 
    time.sleep(0.001) # be nice to the system :) 

较旧版本的redis-py只能读取带有pubsub.listen()的消息。 listen()是一个阻塞直到有消息可用的生成器。如果您的应用程序不需要执行任何操作,只需接收和处理从redis接收到的消息,那么listen()就是启动运行的简单方法。

for message in p.listen(): 
    # do something with the message 

第三个选项在单独的线程中运行事件循环。 pubsub.run_in_thread()创建一个新线程并启动事件循环。线程对象返回给run_in_thread()的调用者。调用者可以使用thread.stop()方法关闭事件循环和线程。在幕后,这只是get_message()的一个包装,它运行在一个单独的线程中,本质上为您创建了一个非常小的非阻塞事件循环。 run_in_thread()采用可选的sleep_time参数。如果指定,则事件循环将使用循环的每次迭代中的值调用time.sleep()。

注意:由于我们在单独的线程中运行,因此无法处理未使用已注册的消息处理程序自动处理的消息。因此,如果您订阅了没有附加消息处理程序的模式或通道,redis-py会阻止您调用run_in_thread()。

p.subscribe(**{'my-channel': my_handler}) 
thread = p.run_in_thread(sleep_time=0.001) 
# the event loop is now running in the background processing messages 
# when it's time to shut it down... 
thread.stop() 

所以要回答你的问题,只需检查get_message,当你想知道消息是否已经到达。

相关问题