2015-02-08 78 views
4

更新2015年8月:对于想要使用消息传递的人,我目前会推荐使用zeromq。可以用作pykka的补充或完全替代。如何发送RabbitMQ消息给Pykka演员?

我该如何聆听RabbitMQ队列中的消息,然后将它们转发给Pykka中的演员?

目前,当我尝试这样做时,我会发现奇怪的行为,系统停下来停下来。

这里是我有我的演员来实现:

class EventListener(eventlet.EventletActor): 
    def __init__(self, target): 
     """ 
     :param pykka.ActorRef target: Where to send the queue messages. 
     """ 
     super(EventListener, self).__init__() 

     self.target = target 

    def on_start(self): 
     ApplicationService.listen_for_events(self.actor_ref) 

这里是我的ApplicationService类应该检查队列新邮件里面方法:

@classmethod 
def listen_for_events(cls, actor): 
    """ 
    Subscribe to messages and forward them to the given actor. 
    """  
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
    channel = connection.channel() 
    channel.queue_declare(queue='test') 
    def callback(ch, method, properties, body): 
     message = pickle.loads(body) 
     actor.tell(message) 

    channel.basic_consume(callback, queue='test', no_ack=True) 
    channel.start_consuming()    

好像start_consuming无限期阻止。我有办法定期自行“轮询”队列吗?

+0

你有什么特别的原因,你在一个程序中同时使用'pika'和'pykka'?似乎你可能会更好地使用自己的'pykka'。 – dano 2015-02-17 00:30:31

+0

要重现此行为,请分享更多代码,如ApplicationServiceClass和其他相关代码吗? – Vinkal 2015-02-17 14:32:26

+0

@dano我需要并发进程在对队列消息的响应中运行。 (想想某种密集的数据分析)。 – drozzy 2015-02-19 02:30:04

回答

3

你所有的代码对我来说都是正确的。如果你想检查每个演员使用的队列,你可以检查actor_inbox属性可用于从Actor#start返回的演员参考。

我从EventletActor继承时遇到类似的问题,以便测试我使用EventletActor和使用ThreadingActor尝试相同的代码。据我从源代码可以看出,他们都使用eventlet来完成工作。 ThreadingActor非常适合我,但EventletActor不适用于ActorRef#tell,它可以与ActorRef#ask一起使用。

我从两个文件开始,位于如下所示的同一目录中。

my_actors.py:初始化两个参与者,它们将通过打印以其类名开头的消息内容来响应消息。

from pykka.eventlet import EventletActor 
import pykka 


class MyThreadingActor(pykka.ThreadingActor): 
    def __init__(self): 
     super(MyThreadingActor, self).__init__() 

    def on_receive(self, message): 
     print(
      "MyThreadingActor Received: {message}".format(
       message=message) 
     ) 


class MyEventletActor(EventletActor): 
    def __init__(self): 
     super(MyEventletActor, self).__init__() 

    def on_receive(self, message): 
     print(
      "MyEventletActor Received: {message}".format(
       message=message) 
     ) 


my_threading_actor_ref = MyThreadingActor.start() 
my_eventlet_actor_ref = MyEventletActor.start() 

my_queue.py:设置队列中鼠兔,将消息发送到其前转发到两个演员设置队列中。在每个演员被告知消息后,他们当前的演员收件箱将被检查队列中的任何内容。

from my_actors import my_threading_actor_ref, my_eventlet_actor_ref 
import pika 


def on_message(channel, method_frame, header_frame, body): 
    print "Received Message", body 
    my_threading_actor_ref.tell({"msg": body}) 
    my_eventlet_actor_ref.tell({"msg": body}) 

    print "ThreadingActor Inbox", my_threading_actor_ref.actor_inbox 
    print "EventletActor Inbox", my_eventlet_actor_ref.actor_inbox 

    channel.basic_ack(delivery_tag=method_frame.delivery_tag) 


queue_name = 'test' 
connection = pika.BlockingConnection() 

channel = connection.channel() 
channel.queue_declare(queue=queue_name) 
channel.basic_consume(on_message, queue_name) 
channel.basic_publish(exchange='', routing_key=queue_name, body='A Message') 

try: 
    channel.start_consuming() 
except KeyboardInterrupt: 
    channel.stop_consuming() 

    # It is very important to stop these actors, otherwise you may lockup 
    my_threading_actor_ref.stop() 
    my_eventlet_actor_ref.stop() 
connection.close() 

当运行my_queue.py输出如下:

接收消息的消息

ThreadingActor收件箱<Queue.Queue instance at 0x10bf55878>

MyThreadingActor收稿:{'msg': 'A Message'}

EventletActor收件箱<Queue maxsize=None queue=deque([{'msg': 'A Message'}]) tasks=1 _cond=<Event at 0x10bf53b50 result=NOT_USED _exc=None _waiters[0]>>

当我打CTRL+C停止排队,我注意到EventletActor终于收到消息并打印:收到

^C MyEventletActor:{'msg': 'A Message'}

这一切都使我相信在EventletActor中可能存在一个错误,我认为你的代码没问题,并且存在一个我在第一次检查时无法在代码中找到的错误。

我希望这些信息对您有所帮助。

+0

有意思......我停止使用ThreadingActors,因为我需要产生一大堆......但是,好像EventletActors不能与ThreadingActors进行交互操作,所以必须将每个actor都切换到一个线程。 Eventlet对我来说仍然是一个谜。太糟糕pykka没有被更新。 – drozzy 2015-02-19 07:33:02