2013-04-22 49 views
1

我开发了一个android应用程序,它订阅了一个队列并发布到其他队列。一次它发布相同的消息到两个不同的队列,其中之一是一个名为“队列”的队列,现在来自一个appfog实例,我需要订阅“队列”并使用消息并将它们插入到MySQL数据库中。如何从rabbitmq调用模型php消费者在codeigniter中的回调?

我使用codeigniter为上述目的创建了一个php独立应用程序。由于某种原因,工作应用程序失去了与rabbitmq的连接。我想知道最好的方法来做到这一点。 appfog上的工作应用程序如何支持重新启动应用程序?

我需要用什么样的东西来解决上述问题。

我想这个问题不是与rabbitmq连接。它与与mysql插入相关的代码。我检查了我的应用程序的崩溃日志,错误是“Mysql消失了”。 php rabbitmq consumer的一个例子有回调接收消息和register_shutdown。在接收回调函数中,我不能使用$ this代码点火器,因为它超出了范围,而且我使用了get_instance()。我不知道如何调用从RabbitMQ的客户端的方法接收回拨功能

控制器是

<?php 

if (!defined('BASEPATH')) 
    exit('No direct script access allowed'); 

include(__DIR__ . '/php-amqplib/config.php'); 

use PhpAmqpLib\Connection\AMQPConnection; 

class Welcome extends CI_Controller { 

public function __construct() { 
    parent::__construct(); 
} 

public function index() { 
    //connect to rabbitmq and consume messages 
    //insert messages to mysql 
    //$this->messages = array(); 
    $exchange = "router"; 
    $queue = "abbiya"; 

    $conn = new AMQPConnection(HOST, PORT, USER, PASS, VHOST); 
    $ch = $conn->channel(); 

    /* 
     name: $queue 
     passive: false 
     durable: true // the queue will survive server restarts 
     exclusive: false // the queue can be accessed in other channels 
     auto_delete: false //the queue won't be deleted once the channel is closed. 
    */ 
    $ch->queue_declare($queue, false, true, false, false); 

    $ch->queue_bind($queue, $exchange, $queue); 

    /* 
     queue: Queue from where to get the messages 
     consumer_tag: Consumer identifier 
     no_local: Don't receive messages published by this consumer. 
     no_ack: Tells the server if the consumer will acknowledge the messages. 
     exclusive: Request exclusive consumer access, meaning only this consumer can access the queue 
     nowait: 
     callback: A PHP Callback 
    */ 
    $consumer_tag = "abbiya"; 

    $ch->basic_recover(true); 
    $ch->basic_consume($queue, $consumer_tag, false, false, false, false, function($msg) { 
       $message_body = json_decode($msg->body); 
       $msg->delivery_info['channel']-> 
         basic_ack($msg->delivery_info['delivery_tag']); 

       // Send a message with the string "quit" to cancel the consumer. 
       if ($msg->body === 'quit') { 
        $msg->delivery_info['channel']-> 
          basic_cancel($msg->delivery_info['consumer_tag']); 
       } 
       $data = array(
        'sender_id' => $message_body->r, 
        'receiver_id' => $message_body->s, 
        'message_content' => $message_body->m, 
        // 'sent_time' => $message_body->t, 
        'status' => 0 
       ); 
       $ci =& get_instance(); 
       $ci->Message_model->newMessage($data); 
      } 
    ); 

    // Loop as long as the channel has callbacks registered 
    while (count($ch->callbacks)) { 
     $ch->wait(); 
    } 

    register_shutdown_function(function() use ($ch, $conn) { 
       $ch->close(); 
       $conn->close(); 
       $this->index(); 
      } 
    ); 
} 

} 

/* End of file welcome.php */ 
/* Location: ./application/controllers/welcome.php */ 

回答

2

首先,我不得不说制作使用CodeIgniter和PHP消费者可能不是一个好主意,除非你添加这样的东西

if(!$this->input->is_cli_request()){ 
     show_404(); // So they dont know this is an actual script 
     die(); 
    } 

并从命令行运行nohup或daemonize它。您可以使用codeigniter的$ this-> db-> reconnect();您可以使用$ this-> db-> reconnect();优雅地重新连接数据库。

我们已经为php和CI的测试目的做了一些消费者。 我们将回调函数放入助手中,并添加了一种方法让call_back函数返回false,以便消费者在完成后停止监听。

对于我们的生产系统,我们使用更适合工作的python,java或camel消费者。