<?php
namespace amqp;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Rabbitmq
{
private $conn = null;
private $channel = null;
private $exchange_name = null;
private $queue_name = null;
private $routing_key = null;
public function __construct()
{
$config = config("amqp.rabbitmq");
//创建连接
$this->conn = new AMQPStreamConnection($config['host'], $config['port'],$config['user'],$config['pass'],$config['vhost']);
//获取通道
$this->channel = $this->conn->channel();
}
// [createDelayDirectRabbitMq 创建延迟队列 仅限direct类型 ]
// @param [type] $exchange_name [交换机名称]
// @param [type] $queue_name [队列名称]
// @param [type] $routing_key [路右键]
// @param integer $expire [队列过期时间 单位 秒 ]
// @return [type] [无]
public function createDelayDirectRabbitMq($exchange_name,$queue_name,$routing_key,$expire = 0)
{
$this->exchange_name = $exchange_name;
$this->queue_name = $queue_name;
$this->routing_key = $routing_key;
$arge = new AMQPTable();
$arge->set('x-delayed-type','direct');
$channel = $this->channel;
$channel->exchange_declare($exchange_name,'x-delayed-message',false,true,false,false,false,$arge);
if ($expire > 0) {
$arge = new AMQPTable();
$arge->set("x-message-ttl",$expire * 1000);
$channel->queue_declare($queue_name, false, true, false, false, false,$arge);
}else{
$channel->queue_declare($queue_name, false, true, false, false);
}
$channel->queue_bind($queue_name,$exchange_name,$routing_key);
}
// [delaySend 发送延迟队列消息]
// @param [type] $message [消息]
// @param [type] $time [消息延迟秒数]
// @return [type] [无]
public function delaySend($message,$time)
{
$channel = $this->channel;
$arge = new AMQPTable();
$arge->set("x-delay",$time * 1000);
$msg = new AMQPMessage($message,[ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,"application_headers" => $arge]);
$channel->basic_publish($msg,$this->exchange_name,$this->routing_key);
}
// [createDirectRabbitMq 创建队列 仅限direct类型]
// @param [type] $exchange_name [交换机名称]
// @param [type] $queue_name [队列名称]
// @param [type] $routing_key [路右键]
// @param integer $expire [过期时间]
// @param [type] $dead_exchange_name [死信交换机]
// @param [type] $dead_queue_name [死信队列]
// @param [type] $dead_routing_key [死信路右键]
// @return [type] [无]
public function createDirectRabbitMq($exchange_name,$queue_name,$routing_key,$expire = 0,$dead_exchange_name = null,$dead_queue_name = null,$dead_routing_key = null)
{
$this->exchange_name = $exchange_name;
$this->queue_name = $queue_name;
$this->routing_key = $routing_key;
$channel = $this->channel;
$channel->exchange_declare($exchange_name,'direct',false,true,false);
if ($expire > 0 && $dead_exchange_name && $dead_queue_name && $dead_routing_key) {
$channel->exchange_declare($dead_exchange_name,'direct',false,true,false);
$channel->queue_declare($dead_queue_name, false, true, false, false);
$channel->queue_bind($dead_queue_name,$dead_exchange_name,$dead_routing_key);
$arge = new AMQPTable();
$arge->set('x-dead-letter-exchange',$dead_exchange_name);
$arge->set('x-dead-letter-queue',$dead_queue_name);
$arge->set('x-dead-letter-routing-key',$dead_routing_key);
$arge->set("x-message-ttl",$expire * 1000);
$channel->queue_declare($queue_name, false, true, false, false, false,$arge);
}else if ($expire > 0) {
$arge = new AMQPTable();
$arge->set("x-message-ttl",$expire * 1000);
$channel->queue_declare($queue_name, false, true, false, false, false,$arge);
}else{
$channel->queue_declare($queue_name, false, true, false, false);
}
$channel->queue_bind($queue_name,$exchange_name,$routing_key);
}
// [send 发送消息]
// @param [type] $message [消息]
// @return [type] [无]
public function send($message)
{
$channel = $this->channel;
$msg = new AMQPMessage($message,[ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg,$this->exchange_name,$this->routing_key);
}
// [get 获取消息]
// @param [type] $call [回调函数]
// @return [type] [无]
public function get($call)
{
$channel = $this->channel;
// $call = function($msg){
// dump($msg->body);
// };
$channel->basic_consume($this->queue_name,'',false, false, false, false,$call);
while ($channel->is_consuming()) {
$channel->wait();
}
}
}