PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技术实践
|
大家好,我是码农先森。
我们在某宝或某多多上抢购商品时,如果只是下了订单但没有进行实际的支付,那在订单页面会有一个支付倒计时,要是过了这个时间点那么订单便会自动取消。在这样的业务场景中,一般情况下就会使用到延时队列。
通常在客户下单之后,就会将订单数据推送到延时队列中并且会对该消息设置一个延时时长,比如设置五分钟、十分钟、或十五分钟等,具体的时长应该还是要结合当前的业务进行衡量,然后消费端会在指定时间到达后就对该消息进行支付支付状态判断,如果已经支付则不予处理,要还是未支付,则会取消该订单,并且释放商品库存。
我们这次分享的内容,主要是基于 Redis 延时队列的实现方式,当然除了 Redis 还可以用其他的技术,比如 RabbitMQ、Kafka、RocketMQ 等专业的消息队列。但是我用 Redis 的原因是,它的应用场景比较广泛,我们平时接触也比较多,而且相对于专业的消息队列它没有过多复杂的配置,学起来容易上手,出了问题解决起来也快,学东西的路径都是由易到难嘛。
另外,如果你对上面提到的专业消息队列使用很熟练,也可以将 Redis 更换成它们,这里只是存储介质的不同,技术的实现逻辑上没有太大区别,重要的是设计思想,大家各取所需吧。
好了,我先介绍一下这次延时队列的实现逻辑。主要分为三个部分,一是:消息的发送,如果设置了延时时间则会将消息存储到 Redis 的延时队列中,反之会直接将消息推送到 Redis 的就绪队列中等待消费。二是:将到期的消息从 Redis 延时队列中取出,并且推送到 Redis 的就绪队列中等待消费。三是:消费端会从 Redis 的就绪队列中按顺序读取出消息,并且执行对应的业务处理逻辑,如果处理失败则会将该消息,再次推送到 Redis 的延时队列中进行下一次的重试。
这里说到的延时队列是利用 Redis 有序集合来实现的,它每间隔一秒钟就会被轮询一次,如果有到期的消息,则就会将该消息推送到 Redis 就绪队列,并且从该集合中移除过期的消息,至此就可以等待着消费端进行消费了。接下来我们就从实际的代码出发,来看一下如何实现基于 Redis 的延时队列。
话不多说,开整!我们先来看一下整体的项目目录结构,内容主要分为 PHP 和 Go 两部分。- [manongsen@root php_to_go]$ tree -L 2
- .
- ├── go_delay
- │ ├── app
- │ │ ├── controller
- │ │ │ └── notify.go
- │ │ ├── config
- │ │ │ └── config.go
- │ │ ├── extend
- │ │ │ └── queue.go
- │ │ └── route.go
- │ ├── go.mod
- │ ├── go.sum
- │ └── main.go
- └── php_delay
- │ ├── app
- │ │ ├── controller
- │ │ │ └── Notify.php
- │ ├── composer.json
- │ ├── composer.lock
- │ ├── command
- │ │ └── Consumer.php
- │ ├── route
- │ │ └── app.php
- │ ├── extend
- │ │ └── Queue.php
- │ ├── think
- │ ├── vendor
- │ └── .env
复制代码 ThinkPHP
使用 composer 创建基于 ThinkPHP 框架的 php_delay 项目。- ## 当前目录
- [manongsen@root ~]$ pwd
- /home/manongsen/workspace/php_to_go/php_delay
- ## 安装 ThinkPHP 框架
- [manongsen@root php_delay]$ composer create-project topthink/think php_delay
- [manongsen@root php_delay]$ cp .example.env .env
- ## 安装 Composer 依赖包
- [manongsen@root php_delay]$ composer require predis/predis
- ## 创建一个消费者脚本
- [manongsen@root php_delay]$ php think make:command Consumer
- ## 创建一个生产者脚本,用于测试
- [manongsen@root php_delay]$ php think make:command Producer
复制代码 这个就是延时队列实现的核心类,定义了就绪、延时、失败三个消息队列。send() 方法用于发送消息,其中可以指定 $delay 参数设置延时时间单位是秒。wait() 方法用于消费端监听消息,从下面的代码可以看出这里还利用多进程,父进程的作用是每间隔一秒钟,就从 Redis 有序集合中读取到期的消息,并将该消息推送到 Redis 就绪队列,子进程则阻塞监听就绪队列的消息,并且将接收到的消息回调到用户自定义的业务函数中。
[code] |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
|
|
|
发表于 2024-9-2 18:00:51
来自手机
举报
回复
分享
|
|
|
|