1

我开发了一个 android 应用程序,它订阅一个队列并发布到其他队列。一次它将相同的消息发布到两个不同的队列,其中一个是名为“Queue”的队列,现在我需要从 appfog 实例订阅“Queue”并使用消息并将它们插入 mysql db。

我使用 codeigniter 为上述目的创建了一个 php 独立应用程序。由于某种原因,worker 应用程序失去了与 rabbitmq 的连接。我想知道做到这一点的最佳方法。appfog 上的工作应用程序如何能够维持应用程序重新启动。

我需要用什么样的东西来解决上述问题。

我认为问题不在于rabbitmq连接。它与与mysql插入相关的代码。我检查了我的应用程序的崩溃日志,错误是“Mysql 消失了”。一个 php rabbitmq 消费者的示例具有接收消息和 register_shutdown 的回调。在接收回调中,我不能使用 $this 的代码点火器,因为它超出了范围,我正在使用 get_instance()。我不知道如何从 rabbitmq 客户端调用方法接收回调函数

控制器是

<?php

if (!defined('BASEPATH'))
  exit('No direct script access allowed');

include(__DIR__ . '/php-amqplib/config.php');

use PhpAmqpLib\Connection\AMQPConnection;

class Welcome extends CI_Controller {

public function __construct() {
    parent::__construct();
}

public function index() {
    //connect to rabbitmq and consume messages
    //insert messages to mysql
    //$this->messages = array();
    $exchange = "router";
    $queue = "abbiya";

    $conn = new AMQPConnection(HOST, PORT, USER, PASS, VHOST);
    $ch = $conn->channel();

    /*
      name: $queue
      passive: false
      durable: true // the queue will survive server restarts
      exclusive: false // the queue can be accessed in other channels
      auto_delete: false //the queue won't be deleted once the channel is closed.
     */
    $ch->queue_declare($queue, false, true, false, false);

    $ch->queue_bind($queue, $exchange, $queue);

    /*
      queue: Queue from where to get the messages
      consumer_tag: Consumer identifier
      no_local: Don't receive messages published by this consumer.
      no_ack: Tells the server if the consumer will acknowledge the messages.
      exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
      nowait:
      callback: A PHP Callback
     */
    $consumer_tag = "abbiya";

    $ch->basic_recover(true);
    $ch->basic_consume($queue, $consumer_tag, false, false, false, false, function($msg) {
                $message_body = json_decode($msg->body);
                $msg->delivery_info['channel']->
                        basic_ack($msg->delivery_info['delivery_tag']);

                // Send a message with the string "quit" to cancel the consumer.
                if ($msg->body === 'quit') {
                    $msg->delivery_info['channel']->
                            basic_cancel($msg->delivery_info['consumer_tag']);
                }
                $data = array(
                    'sender_id' => $message_body->r,
                    'receiver_id' => $message_body->s,
                    'message_content' => $message_body->m,
                    // 'sent_time' => $message_body->t,
                    'status' => 0
                );
                $ci =& get_instance();
                $ci->Message_model->newMessage($data);
            }
    );

    // Loop as long as the channel has callbacks registered
    while (count($ch->callbacks)) {
        $ch->wait();
    }

    register_shutdown_function(function() use ($ch, $conn) {
                $ch->close();
                $conn->close();
                $this->index();
            }
    );
}

}

/* End of file welcome.php */
/* Location: ./application/controllers/welcome.php */
4

2 回答 2

3

首先我不得不说用 CodeIgniter 和 php 做一个消费者可能不是一个好主意,除非你添加这样的东西

    if( !$this->input->is_cli_request() ){
        show_404(); // So they dont know this is an actual script
        die();
    }

并使用 nohup 从命令行运行或对其进行守护。

您可以使用 codeigniter 的 $this->db->reconnect(); 优雅地重新连接数据库。

我们已经创建了一些消费者,用于使用 php 和 CI 进行测试。我们将回调函数放入一个帮助器中,并添加了一种让 call_back 函数返回 false 的方法,以便消费者在我们完成后停止监听。

对于我们的生产系统,我们使用更适合这项工作的 python、java 或骆驼消费者。

于 2013-12-05T20:12:46.017 回答
0

在这里添加一个答案,因为自从这个问题最初发布以来,已经在 Codeigniter 上创建了一个新的 RabbitMQ 库并在 Github 上可用

它很容易设置,并且在自述文件中有分步说明:https ://github.com/romainrg/rabbitmq_client

于 2021-11-24T19:01:54.423 回答