0
点赞
收藏
分享

微信扫一扫

php语言创建rabbitmq延迟消息队列代码说明

GG_lyf 2022-03-30 阅读 46
<?php

namespace amqp;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class Rabbitmq
{

    private $conn = null;

    private $channel = null; 

    private $exchange_name = null; 

    private $queue_name = null; 

    private $routing_key = null; 

    public function __construct()
    {
       $config = config("amqp.rabbitmq");
       //创建连接
       $this->conn = new AMQPStreamConnection($config['host'], $config['port'],$config['user'],$config['pass'],$config['vhost']);
       //获取通道
       $this->channel = $this->conn->channel();
    }

    // [createDelayDirectRabbitMq 创建延迟队列 仅限direct类型 ]
    // @param [type]  $exchange_name [交换机名称]
    // @param [type]  $queue_name   [队列名称]
    // @param [type]  $routing_key  [路右键]
    // @param integer $expire       [队列过期时间 单位 秒 ]
    // @return [type]               [无]
    public function createDelayDirectRabbitMq($exchange_name,$queue_name,$routing_key,$expire = 0)
    {
       $this->exchange_name = $exchange_name;
       $this->queue_name = $queue_name;
       $this->routing_key = $routing_key;

       $arge = new AMQPTable();
       $arge->set('x-delayed-type','direct');

       $channel = $this->channel;
      $channel->exchange_declare($exchange_name,'x-delayed-message',false,true,false,false,false,$arge);

       if ($expire > 0) {
           $arge = new AMQPTable();
          $arge->set("x-message-ttl",$expire * 1000);
          $channel->queue_declare($queue_name, false, true, false, false, false,$arge);
       }else{
          $channel->queue_declare($queue_name, false, true, false, false);
       }

      $channel->queue_bind($queue_name,$exchange_name,$routing_key);

    }

    // [delaySend 发送延迟队列消息]
    // @param [type] $message [消息]
    // @param [type] $time    [消息延迟秒数]
    // @return [type]         [无]
    public function delaySend($message,$time)
    {
       $channel = $this->channel;

       $arge = new AMQPTable();
       $arge->set("x-delay",$time * 1000);

       $msg = new AMQPMessage($message,[ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,"application_headers" => $arge]);
      $channel->basic_publish($msg,$this->exchange_name,$this->routing_key);
    }

    // [createDirectRabbitMq 创建队列 仅限direct类型]
    // @param [type]  $exchange_name     [交换机名称]
    // @param [type]  $queue_name       [队列名称]
    // @param [type]  $routing_key      [路右键]
    // @param integer $expire           [过期时间]
    // @param [type]  $dead_exchange_name [死信交换机]
    // @param [type]  $dead_queue_name   [死信队列]
    // @param [type]  $dead_routing_key  [死信路右键]
    // @return [type]                   [无]
    public function createDirectRabbitMq($exchange_name,$queue_name,$routing_key,$expire = 0,$dead_exchange_name = null,$dead_queue_name = null,$dead_routing_key = null)
    {
       $this->exchange_name = $exchange_name;
       $this->queue_name = $queue_name;
       $this->routing_key = $routing_key;

       $channel = $this->channel;
      $channel->exchange_declare($exchange_name,'direct',false,true,false);

       if ($expire > 0 && $dead_exchange_name && $dead_queue_name && $dead_routing_key) {
          $channel->exchange_declare($dead_exchange_name,'direct',false,true,false);
          $channel->queue_declare($dead_queue_name, false, true, false, false);
          $channel->queue_bind($dead_queue_name,$dead_exchange_name,$dead_routing_key);

           $arge = new AMQPTable();
          $arge->set('x-dead-letter-exchange',$dead_exchange_name);
          $arge->set('x-dead-letter-queue',$dead_queue_name);
          $arge->set('x-dead-letter-routing-key',$dead_routing_key);
          $arge->set("x-message-ttl",$expire * 1000);
          $channel->queue_declare($queue_name, false, true, false, false, false,$arge);
       }else if ($expire > 0) {
           $arge = new AMQPTable();
          $arge->set("x-message-ttl",$expire * 1000);
          $channel->queue_declare($queue_name, false, true, false, false, false,$arge);
       }else{
          $channel->queue_declare($queue_name, false, true, false, false);
       }
      $channel->queue_bind($queue_name,$exchange_name,$routing_key);
    }

    // [send 发送消息]
    // @param [type] $message [消息]
    // @return [type]         [无]
    public function send($message)
    {
       $channel = $this->channel;
       $msg = new AMQPMessage($message,[ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
      $channel->basic_publish($msg,$this->exchange_name,$this->routing_key);
    }

    // [get 获取消息]
    // @param [type] $call [回调函数]
    // @return [type]      [无]
    public function get($call)
    {
       $channel = $this->channel;

       // $call = function($msg){
       //  dump($msg->body);
       // };

      $channel->basic_consume($this->queue_name,'',false, false, false, false,$call);

       while ($channel->is_consuming()) {
          $channel->wait();
       }

    }

}
举报

相关推荐

0 条评论