翼度科技»论坛 编程开发 PHP 查看内容

TP5使用RabbitMQ实现消息队列的项目实践

12

主题

12

帖子

36

积分

新手上路

Rank: 1

积分
36
在使用 RabbitMQ 之前,你要安装好 RabbitMQ 服务,具体安装方法可以参考 windows下安装RabbitMQ

1、安装扩展

进入TP5 更目录下,输入命令安装:
  1. composer require php-amqplib/php-amqplib
复制代码
2、自定义命令

TP5 的自定义命令,这里也简单说下。
第一步:
创建命令类文件,新建 application/api/command/Test.php。
  1. <?php
  2. namespace app\api\command;
  3. use think\console\Command;
  4. use think\console\Input;
  5. use think\console\Output;
  6. /**
  7.  * 自定义命令测试
  8.  */
  9. class Test extends Command
  10. {
  11.     /**
  12.      * 配置
  13.      */
  14.     protected function configure()
  15.     {
  16.         // 设置命令的名称和描述
  17.         $this->setName('test')->setDescription('这是一个测试命令');
  18.     }
  19.     /**
  20.      * 执行
  21.      */
  22.     protected function execute(Input $input, Output $output)
  23.     {
  24.         $output->writeln("测试命令");
  25.     }
  26. }
复制代码
这个文件定义了一个叫test的命令,备注为 这是一个测试命令,执行命令会输出:test command。
第二步:
配置 command.php文件,在 application/command.php文件中添加命令。
  1. <?php
  2. return [
  3.     'app\api\command\Test',
  4. ];
复制代码
第三步:
测试命令,在项目根目录下输入命令:
  1. php think test
复制代码
回车运行之后输出:
  1. test command
复制代码
到这里,自定义命令就结束了,test命令就自定义成功了。

3、rabbitmq服务端

下来我们自定义 RabbitMQ 启动命令,守护进程运行,启动 rabbirmq 服务端接收消息。
在 application/api/command 目录下,新建 Ramq.php 文件,在执行命令的方法中,调用 RabbitMQ 启动守护进程方法即可。
  1. <?php
  2. namespace app\api\command;
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use think\console\Command;
  5. use think\console\Input;
  6. use think\console\Output;
  7. /**
  8.  * RabbitMq 启动命令
  9.  */
  10. class Ramq extends Command
  11. {
  12.     protected $consumerTag = 'customer';
  13.     protected $exchange = 'xcuser';
  14.     protected $queue = 'xcmsg';
  15.     protected function configure()
  16.     {
  17.         $this->setName('ramq')->setDescription('rabbitmq');
  18.     }
  19.     protected function execute(Input $input, Output $output)
  20.     {
  21.         $output->writeln("消息队列开始");
  22.         $this->start();
  23.         // 指令输出
  24.         $output->writeln('消费队列结束');
  25.     }
  26.     /**
  27.      * 关闭
  28.      */
  29.     function shutdown($channel, $connection)
  30.     {
  31.         $channel->close();
  32.         $connection->close();
  33.     }
  34.     /**
  35.      * 回调处理信息
  36.      */
  37.     function process_message($message)
  38.     {
  39.         if ($message->body !== 'quit') {
  40.             echo $message->body;
  41.         }
  42.         //手动应答
  43.         $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
  44.         if ($message->body === 'quit') {
  45.             $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
  46.         }
  47.     }
  48.     /**
  49.      * 启动 守护进程运行
  50.      */
  51.     public function start()
  52.     {
  53.         $host = '127.0.0.1';
  54.         $port = 5672;
  55.         $user = 'guest';
  56.         $pwd = 'guest';
  57.         $vhost = '/';
  58.         $connection = new AMQPStreamConnection($host, $port, $user, $pwd, $vhost);
  59.         $channel = $connection->channel();
  60.         $channel->queue_declare($this->queue, false, true, false, false);
  61.         $channel->exchange_declare($this->exchange, 'direct', false, true, false);
  62.         $channel->queue_bind($this->queue, $this->exchange);
  63.         $channel->basic_consume($this->queue, $this->consumerTag, false, false, false, false, array($this, 'process_message'));
  64.         register_shutdown_function(array($this, 'shutdown'), $channel, $connection);
  65.         while (count($channel->callbacks)) {
  66.             $channel->wait();
  67.         }
  68.     }
  69. }
复制代码
在application/command.php文件中,添加rabbitmq自定义命令。
  1. return [
  2.     'app\api\command\Ramq',// rabbitmq
  3. ];
复制代码
4、发送端

最后,我们再写发送消息的控制器,实现消息队列,具体代码如下:
  1. <?php
  2. namespace app\api\controller;
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. use think\Controller;
  6. /**
  7.  * 发送端
  8.  */
  9. class MessageQueue extends Controller
  10. {
  11.     const exchange = 'xcuser';
  12.     const queue = 'xcmsg';
  13.     /**
  14.      * 发送消息
  15.      */
  16.     public function pushMessage($data)
  17.     {
  18.         $host = '127.0.0.1';
  19.         $port = 5672;
  20.         $user = 'guest';
  21.         $pwd = 'guest';
  22.         $vhost = '/';
  23.         $connection = new AMQPStreamConnection($host, $port, $user, $pwd, $vhost);
  24.         $channel = $connection->channel();
  25.         $channel->exchange_declare(self::exchange, 'direct', false, true, false);
  26.         $channel->queue_declare(self::queue, false, true, false, false);
  27.         $channel->queue_bind(self::queue, self::exchange);
  28.         $messageBody = $data;
  29.         $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
  30.         $channel->basic_publish($message, self::exchange);
  31.         $channel->close();
  32.         $connection->close();
  33.         echo 'ok';
  34.     }
  35.     /**
  36.      * 执行
  37.      */
  38.     public function index()
  39.     {
  40.         $data = json_encode(['msg' => '测试数据', 'id' => '15']);
  41.         $this->pushMessage($data);
  42.     }
  43. }
复制代码
5、验证

先执行自定义命令,启动 rabbitmq 守护进程。在项目更目录下打开命令行,输入下面命令:
  1. php think ramq
复制代码
然后在浏览器访问发送信息的方法,http://你的域名/api/message/index,你发送一次消息,在命令行就会输出一条消息。这样我们就用 RabbitMQ 实现了一个简单的消息队列。
到此这篇关于TP5使用RabbitMQ实现消息队列的项目实践的文章就介绍到这了,更多相关TP5 RabbitMQ消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

来源:https://www.jb51.net/program/290641v5a.htm
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具