2016-07-12 64 views
0

我试图用https://github.com/php-amqplib/RabbitMqBundle和Symfony2框架实现RabbitMQ。Oldsound rabbitmq bundle多个消费者配置

我已经设法让事情与1个生产者和1个消费者一起工作,但问题是当我使用多个消费者。

这是我的配置:

old_sound_rabbit_mq: 
    connections: 
     default: 
      host:  'localhost' 
      port:  5672 
      user:  'guest' 
      password: 'guest' 
      vhost: '/' 
      lazy:  false 
      connection_timeout: 3 
      read_write_timeout: 3 

      # requires php-amqplib v2.4.1+ and PHP5.4+ 
      keepalive: false 

      # requires php-amqplib v2.4.1+ 
      heartbeat: 0 

      #requires php_sockets.dll 
#   use_socket: true # default false 
    producers: 
     soccer_team_stat: 
      connection:  default 
      exchange_options: {name: 'soccer_team_stat_ex', type: direct} 
      queue_options: {name: 'soccer_team_stat_qu'} 
     soccer_team_stat_form: 
      connection: default 
      exchange_options: {name: 'soccer_team_stat_ex', type: direct} 
      queue_options: {name: 'soccer_team_stat_form_qu'} 
    consumers: 
     soccer_team_stat: 
      connection:  default 
      exchange_options: {name: 'soccer_team_stat_ex', type: direct} 
      queue_options: {name: 'soccer_team_stat_qu'} 
      callback:   myapp.soccer_team_stat.consume 
     soccer_team_stat_form: 
      connection:  default 
      exchange_options: {name: 'soccer_team_stat_ex', type: direct} 
      queue_options: {name: 'soccer_team_stat_form_qu'} 
      callback:   myapp.soccer_team_stat_form.consume 

服务定义:

<services> 

     <service class="MyApp\EtlBundle\Producers\SoccerTeamStatProducer" id="myapp.soccer_team_stat.produce"> 
      <argument type="service" id="old_sound_rabbit_mq.soccer_team_stat_producer"/> 
     </service> 

     <service class="MyApp\EtlBundle\Producers\SoccerTeamStatProducer" id="myapp.soccer_team_stat_form.produce"> 
      <argument type="service" id="old_sound_rabbit_mq.soccer_team_stat_producer"/> 
     </service> 

     <service class="MyApp\EtlBundle\Consumers\SoccerTeamStatConsumer" id="myapp.soccer_team_stat.consume"> 
      <argument type="service" id="service_container"/> 
     </service> 

     <service class="MyApp\EtlBundle\Consumers\SoccerTeamStatFormConsumer" id="myapp.soccer_team_stat_form.consume"> 
      <argument type="service" id="service_container"/> 
     </service> 

    </services> 

而且在PHP应用程序/控制台的RabbitMQ:消费者-d soccer_team_stat_form我得到:

[Symfony的\ Component \ DependencyInjection \ Exception \ ServiceNotFoundException] 您已请求不存在的服务 “old_sound_rabbit_mq.soccer_team_stat_form_consumer”。

我试过各种组合,包括使用multiple_consumers配置键但没有成功。我错过了什么?

回答

1

如果没有的routing_keybinding_key设置,direct交流会像fanout和邮件发送到它知道这样基于什么我从你的配置看到所有队列,你最好使用fanout所以不喜欢下面。

old_sound_rabbit_mq: 
    connections: 
     default: 
      host:  %rabbit_mq_host% 
      port:  %rabbit_mq_port% 
      user:  %rabbit_mq_user% 
      password: %rabbit_mq_pswd% 
      vhost: /
      lazy:  true 
    producers: 
     soccer_team_stat: 
      connection:  default 
      exchange_options: { name: 'soccer_team_stat_ex', type: fanout } 
     soccer_team_stat_form: 
      connection:  default 
      exchange_options: { name: 'soccer_team_stat_form_ex', type: fanout } 
    consumers: 
     soccer_team_stat: 
      connection:  default 
      exchange_options: { name: 'soccer_team_stat_ex', type: fanout } 
      queue_options: { name: 'soccer_team_stat_qu' } 
      callback:   myapp.soccer_team_stat.consume 
     soccer_team_stat_form: 
      connection:  default 
      exchange_options: { name: 'soccer_team_stat_form_ex', type: fanout } 
      queue_options: { name: 'soccer_team_stat_form_qu' } 
      callback:   myapp.soccer_team_stat_form.consume 

RabbitMQ fanout example with symfony including 2 Producer & 2 Exchange & 2 Queue & N Worker & 2 Consumer是完整的例子(实际上是完全回答你的问题/已取得的你想要做什么版本),显示事情是如何的symfony应用程序中完成的。我建议你遵循那里使用的模式。非常容易遵循和维护。如果您想要更多示例,只需在该博客中搜索RabbitMQ关键字即可。