RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
1,安装rabbitmq,php扩展
# yum install php-pecl-amqp rabbitmq-server epel-release
2,启动
# /etc/init.d/rabbitmq-server start
如果要改什么配置的话,在/etc/rabbitmq 目录下新增文件:rabbitmq-env.conf,注意:文件名是固定的
RABBITMQ_NODE_IP_ADDRESS=192.168.10.208 RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit
3,启用监控模块
# cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.1.5/sbin # ./rabbitmq-plugins enable rabbitmq_management //启用监控模块 The following plugins have been enabled: mochiweb webmachine rabbitmq_web_dispatch amqp_client rabbitmq_management_agent rabbitmq_management # /etc/init.d/rabbitmq-server restart //重启
4,php测试代码
<?php class RabbitMq { public $configs = array('host'=>'192.168.10.208','port'=>5672,'username'=>'guest','password'=>'guest','vhost'=>'/'); //交换机名称 public $exchange_name = 'test-e'; //队列名称 public $queue_name = 'test-q'; //路由名称 public $route_key = 'test-r'; /* * 持久化,默认true */ public $durable = true; /* * 自动删除 * exchange is deleted when all queues have finished using it * queue is deleted when last consumer unsubscribes * */ public $autodelete = true; /* * 镜像 * 镜像队列,打开后消息会在节点之间复制,有master和slave的概念 */ public $mirror = false; private $_conn = null; private $_exchange = null; private $_channel = null; private $_queue = null; /** * 创建队列需要用到的虚拟主机、交换机 */ public function __construct($configs = array(), $exchange_name = '', $queue_name = '', $route_key = '') { if(!empty($configs)){ $this->setConfigs($configs); } if(!empty($exchange_name)){ $this->exchange_name = $exchange_name; } if(!empty($queue_name)){ $this->queue_name = $queue_name; } if(!empty($route_key)){ $this->route_key = $route_key; } $this->_conn = new AMQPConnection($this->configs); $this->_conn->connect(); //创建channel $this->_channel = new AMQPChannel($this->_conn); //创建exchange交换机 $this->_exchange = new AMQPExchange($this->_channel); $this->_exchange->setName($this->exchange_name);//创建名字 $this->_exchange->setType(AMQP_EX_TYPE_DIRECT); $this->_exchange->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); $this->_exchange->declare(); //创建队列 $this->_queue = new AMQPQueue($this->_channel); //设置队列名字 如果不存在则添加 $this->_queue->setName($this->queue_name); $this->_queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); echo "queue status: ".$this->_queue->declare(); echo "<br />"; echo 'queue bind: '.$this->_queue->bind($this->exchange_name,$this->route_key);//将你的队列绑定到routingKey //echo 'queue bind: '.$this->_queue->bind($this->exchange_name,'route.'.$timestr);//将你的队列绑定到routingKey echo "<br />"; } /** * * 向队列里面添加信息 */ public function setRabbitMqInfo() { //发布消息 $message = json_encode(array('Hello World!','php','c++')); $this->_channel->startTransaction(); echo "send: ".$this->_exchange->publish($message, $this->route_key); //将你的消息通过制定routingKey发送 //echo "send: ".$ex->publish($message, 'route.'.$timestr); //将你的消息通过制定routingKey发送 $this->_channel->commitTransaction(); //$this->_conn->disconnect(); } /** * * 获取队列里的信息 */ public function getRabbitMqInfo () { // $this->_queue->consume(array($this,'processMessage')); //需手动应答 while($this->_queue->declare()){ $messages = $this->_queue->get(AMQP_AUTOACK) ; if ($messages){ echo "<pre>"; print_r(json_decode($messages->getBody(),true))."\n"; } } // disconnect $this->_conn->disconnect(); } }
调用代码:
<?php set_time_limit(0); include_once('rabbitmq.inc.php'); $ra = new RabbitMq(); $ra->setRabbitMqInfo(); $ra->getRabbitMqInfo(); //结果如下 [root@vpn208 ~]# php test.php queue status: 0<br />queue bind: 1<br />send: 1<pre>Array ( [0] => Hello World! [1] => php [2] => c++ )
命令行下调用,还是直接URL访问都是可以的.
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/server/1817.html