4.3万字详解PHP+RabbitMQ(AMQP协议、通讯架构、6大模式、交换机队列消息持
理论(后半部分有实操详解)哲学思考
[*]易经思维:向各国人讲述一种动物叫乌龟,要学很久的各国语言,但是随手画一个乌龟,全世界的人都能看得懂。
[*]道家思维:努力没有用(指劳神费心的机械性重复、肢体受累、刻意行为),要用心(深度思考、去感悟、透过现象看本质)才有用。
[*]举例:类似中学做不出来的几何题的底层原理:不是不知道xx定理或公式(招式),而是不知道画辅助线的思路(内功)。
[*]总结:万事万物、用道家思维思考本质与规律,用易经思维从众多信息中简化模型练出来的内功,叫觉悟(内力),而知识仅仅是外功(招式)。
做研发的,可能距离成功一步之遥,别因为一叶障目而放弃。
消息队列与消息中间件
[*]消息队列:一种存储消息的数据结构,消息队列通常是一种先进先出(FIFO)的数据结构,可以确保消息的顺序传递。类比Redis的List,先进先出类比lPush和rPop。
[*]消息中间件:是管理消息传递的一种组件。功能包含消息队列。
[*]中间件:中间件是指位于客户端(或调用端)和服务端之间的组件,用于协调、存储、接口、管理组件之间的通信,类比卖家与买家之间的快递驿站,不是一定要用,但是最好得有。
AMQP协议
AMQP(Advanced Message Queuing Protocol),是一种用于消息传递协议,类比成HTTP、TCP、UDP这种不同的协议就行。它定义了消息中间件客户端与服务端(买家买家对驿站的沟通)的通信规则(怎么运快递),包括消息格式(什么类型的快递)、消息发布(怎么发快递)、消息订阅(怎么收快递)、队列管理(怎么处理快递)等。
AMQP高效就高效在把通信(物流阶段)能遇到的问题都解决了。
[*]异步通信:AMQP支持异步通信,可以让消息发送者和接收者在不同的时间和速度进行消息传递。
[*]消息路由:AMQP协议定义了灵活的消息路由规则,可以根据消息内容自动将消息路由到指定的接收者。
[*]消息确认:AMQP支持消息确认机制,确保消息被可靠地接收。
[*]批量处理:AMQP协议支持批量消息处理,可以同时发送和接收多个消息,减少了网络通信的开销和系统的资源消耗。
RabbitMQ
[*]官方文档:https://www.rabbitmq.com/docs
[*]极简概括:使用ErLang语言写基于AMQP协议的C/S架构的消息中间件,用于不同组件之间高效的传递数据。
[*]解决问题:
[*]削峰:卖家一家伙运了几十吨的货(海量请求)、买家没地方存、扛不住了,那就放驿站缓冲一下。
[*]解耦:没有驿站中转,快递送过来硬塞给买家,买家不在,这个流程就走不下去(耦合)。
[*]异步:卖家只要发货、流程基本走完,剩下的流程交给物流和驿站(中间件),不影响卖家做其它事(非阻塞),买家也一样、快递放驿站、我忙我的。需要时再取(按需订阅)。
[*]卷:面试加分项,工作用到减轻负担。只想卷死各位,或者被各位卷死。
[*]适用场景
[*]流量削峰:当系统抗不出海量的请求的时候,把MQ放置在用户端与业务端之间(强制排队)削去部分峰值流量(Nginx令牌桶和滴水算法、或基于Redis也能实现),这个过程和加锁的缺点差不多,性能会受影响。
[*]应用解耦:小型项目用不上,大型项目中,库存、订单、支付、物流等各种模块,为了防止硬关联耦合度大,一节点挂掉其余瘫痪,所以用MQ作为通信的桥梁。
[*]异步处理:用的最多,异步发短信,发邮件、导出导出文件、延时任务、自动取消订单、推送通知、回调重试等。
[*]助力跳槽涨薪:只想卷死各位,或者被各位卷死。
[*]赋能成就感:RabbitMQ都用上了,感觉离架构师不远了(呵呵)。
[*]优点:
[*]应用场景就是优点,主流。
[*]丰富的客户端支持,支持PHP、Java、Python、Ruby、C、Golang、NodeJS、.NET、Objective-C、Swift。
[*]缺点:
[*]性能下降:流量削峰引起的性能下降。
[*]运维成本:多引入一个组件,就要考虑它的运维成本,以及各项配置问题。
[*]大数据处理:对于大数据、流数据、日志分析、更适合Kafka,RabbitMQ性能会下降。
[*]安装困难:成熟的软件安装不方便,Erlang环境依赖强,其次是官网还把CentOS7的安装去掉了。
[*]同类产品:RocketMQ(仅支持Java和C++)、ActiveMQ(后期停止维护)、Kafka(大数据场景)、基于Redis实现的任务队列(轻量级使用)、编程语言框架提供的支持(Laravel Queue)、AWSMQ(亚马逊)、ApsaraMQ(阿里巴巴)、PulsarMQ(Apache)。
工作架构图
https://img-blog.csdnimg.cn/direct/5ae53f37e4924f50b3ed4cdf2ea3cbb7.png
[*]Producer:生产者,类比卖家。
[*]Connection:客户端与服务端的通信层。
[*]Channel:这里叫信道,类比要发那个快递,或从那家快递取货。
[*]Broker:用于接受和分发消息的处理,这是MQ Server要干的事,类比驿站。
[*]Exchange:交换机(不是switch),指定规则路由到那个队列,类比分配到那个货架的方法。
[*]Consumer:消费者,类比买家。
[*]补充:每次访问RabbitMQ都需要建立一个连接,海量的请求对于这块的开销是绝大的,所以需要在channel与connection内部建立逻辑连接,从而减少性能损耗。
[*]多租户设计:每个Block里面可以多个Vhost,Vhost里面,可以有多个Exchange(交换机),每个Exchange可以有多个Queue。
四大概念的通俗理解
[*]生产者:顾名思义生产数据的角色,类比卖家发快递。
[*]消费者:生产者产出的数据,给消费者使用,类比买家收快递。
[*]交换机:,用于接受生产者的消息,通过指定模式(4大模式)和路由键(交换机与队列绑定标识符)分发给队列,一个交换机可绑定多个队列,将消息路由到多个队列,类比快递驿站的分发到那个货架。
[*]队列:一种数据结构,存放消息队列,类比快递驿站的货架。
[*]流程:生产者(发快递)->交换机(驿站怎么分类)->队列(驿站怎么存)->消费者(拿快递)。
Exchange(交换机)
[*]极简概括:位于生产者和队列之间,用于接受生产者的消息,并分发给队列,一个交换机可绑定多个队列,将消息路由到多个队列。
[*]解决问题:生产者发一条消息,让所有或指定消费者能够收到(类似广播)。如果没有交换机机制,只会有一个消费者能收到此消息。
[*]交换机4大类型:直接(direct)类型、主题(topic)类型、头(headers)类型、扇出(fanout)类型(下文有详解)。
[*]补充:在常规模式,工作队列模式(生产者端,basic_publish方法参数2为空字符串),也有一个默认类型交换机,或称之为无名Exchange。
Routing Key(路由键)
[*]极简概括:交换机绑定队列的标识符,一个交换机,可以有多个Routing key。
[*]解决问题:起个名用于区分,方便对不同队列进行不同的操作,就像MySQL表id作用一样。
死信
无法被消费的消息。
死信队列
[*]极简概括:队列中的消息无法被消费,若没有后续的处理,就成了死信队列。
[*]解决问题:将消费异常的数据放入死信队列,用于存储消费失败或者异常时的情况,确保失败的消息能够得到适当的处理(重试或由开发者调试查看用)。可以简单的理解为找个地方存失败的消费任务。也可将计就计,利用某些特性作为延时队列使用。
[*]产生原因:
[*]消息TTL过期。
[*]队列满了。
[*]消息被拒绝(basic reject 或basic nack),并且requeue为false。
有Redis List去实现消息队列,为什么要RabbitMQ?
[*]持久化问题:RabbitMQ支持持久化,Redis虽然也支持持久化,但只要不是每次操作都持久化,那么就有丢失数据的风险。
[*]消息应答问题:消息处理成功与失败,Redis用队列无法记录,任务消息只会取一个少一个,而RabbitMQ可以。
[*]故障转移问题:Redis哨兵机制、主从复制,是针对缓存高可用,做消息中间件有局限性。RabbitMQ支持消息重新入队。如果某个消费者由于某些原因失去连接,导致消息未发送ACK确认,那么RabbitMQ有让消息重新排队的机制,如果此时其它消费者可以处理,那就让其它消费者处理。
[*]支持消息优先级:RabbitMQ支持消息优先级,而Redis不支持。
[*]广播支持:RabbitMQ支持广播,等指定队列发送,而Redis不支持。
[*]路由转发:RabbitMQ通过交换机机制,支持设定不同的分发队列规则,满足各个场景,而Redis List需要手动实现这块内部机制。
有Redis Sorted Set、或者过期监听去实现延时队列,为什么要RabbitMQ?
RabbitMQ是推模式还是拉模式?
都有,生产者发数据到MQ是推,消费者消费消息是拉。
通信方案选择Push还是Pull?
推或拉是两种通信的方向选择,跟MQ无关,但是类似MQ,顺便提一下。
个人认为:
[*]看业务场景,抛开业务场景谈架构都是耍流氓
[*]要求实时性的,就选择推送,轮询耗费网络资源,调用端或客户端每次请求,服务端都得执行一次,尤其是并发量大或者响应流程任务重的场景。
[*]不需要实时性的,就拉取,减低耦合,服务端就纯粹的产生服务端的数据就行,客户端或调用端谁想拉取让它自己来。
[*]看改动开销
[*]保证工程质量的前提下,那种方式开销小,技术老大说用哪个,或者同事们习惯那一种,就用哪一种。
[*]看调用端可信任性。
[*]各种鉴权,验证是一方面,数据传输也是一方面,对于不信任的平台,白推送的数据,与对方直接请求获取。还是有区别的。
RabbitMQ可以直连队列吗?
RabbitMQ内部不可以直连队列,但是操作上可以直连队列。
就算是常规(Hello World)模式,没有声明交换机,也会经过一个默认交换机。
不过这样丧失了交换机灵活的路由分发功能,适用于简单的场景。
实操
安装
Docker安装
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
浏览器访问:http://192.168.xxx.xxx:15672普通安装
CentOS7的安装RabbitMQ的教程,已经被官网删除了,支持CentOS8,CentOS需要借助外力。
安装Erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum -y install erlang
安装RabbitMQ
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum install rabbitmq-server
开启服务,并设置开机自启
systemctl start rabbitmq-server
systemctl enable rabbitmq-server
检查状态
systemctl status rabbitmq-server
启动网页端控制台
rabbitmq-plugins enable rabbitmq_management
开启防火墙
firewall-cmd --zone=public --add-port=80/tcp --permanent
systemctl restart firewalld
新建网页端登录用户,并配置角色与权限
rabbitmqctl add_user admin 12345678
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl set_permissions -p <virtual_host> <username> <configure_permission> <write_permission> <read_permission>
-p <virtual_host>:指定虚拟主机的名称,例如/(默认虚拟主机)。
<username>:要为其设置权限的用户名。
<configure_permission>:配置权限的正则表达式。允许用户对队列、交换机等进行配置。
<write_permission>:写权限的正则表达式。允许用户发布消息。
<read_permission>:读权限的正则表达式。允许用户获取消息。
浏览器访问
http://192.168.xxx.xxx:15672
用户名admin,密码12345678命令行常用命令
systemctl start/stop/restart/status/enable rabbitmq-server # RabbitMQ Server开启、关停、重启、状态查看、开机自启
rabbitmq-plugins enable 插件名 # RabbitMQ Server安装插件
rabbitmq-plugins list # 插件列表
rabbitmqctl version # 查看RabbitMQ Server版本
rabbitmqctl list_exchanges # 查看交换机列表
rabbitmqctl list_queues # 查看队列列表
rabbitmqctl list_bindings # 查看绑定列表PHP实现RabbitMQ Client
[*]RabbitMQ6大模式官方教程:https://www.rabbitmq.com/tutorials
[*]官方扩展(不用):https://pecl.php.net/package/amqp/1.11.0/windows
这个扩展官方下载地址有最后一版Windows系统的php_amqp.dll的下载地址,(用Windows是为了方便,在CentOS上还需要编译,改完PHP代码每次需要重新上传,不想费事),但是我使用报错,所以废弃了。
[*]官方推荐:composer require php-amqplib/php-amqplib
PHP操作RabbitMQ思路并不复杂,有6种工作模式,翻来覆去就是研究消息怎么发,发到哪里,怎么处理消息的问题。
[*]Laravel框架:可以使用php-amqplib/php-amqplib,去操作,也可以使用现成的laravel-queue-rabbitmq去操作。composer require vladimir-yuldashev/laravel-queue-rabbitmq
消费者常驻进程偏好
while (count($channel->callbacks)) {
$channel->wait();
}
//或者
$channel->consume();常规模式(Hello World!)
[*]极简概括:生产者生产消息给消费者。
[*]解决问题:跨进程,或跨组件、跨网络通信,适用与两个角色,但是一个PHP进程无法完成的时候用。
[*]备注:用MySQL、Redis也能实现。
[*]生产者端代码:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//初始化连接
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
//初始化信道
$channel = $connection->channel();
/*
参数1:队列名
参数2:在声明队列时指定是否启用passively模式,passively模式用于检查队列是否存在,而不是实际创建一个新队列。如果队列不存在,则会返回一个通知,而不会创建新队列。
参数3:指定队列的持久性。在这里,它是false,表示队列不是持久的。如果设置为true,则队列将在服务器重启时或宕机后保留下来。
参数4:指定队列的排他性。如果设置为 true,则该队列只能被声明它的连接使用,一般用于临时队列。false表示队列不是排它的。
参数5:指定队列的自动删除,如果设置为 true,则在队列不再被使用时将自动删除。在这里,它是 false,表示队列不会自动删除。
*/
$channel->queue_declare('hello', false, false, false, false);
//编辑消息
$msg = new AMQPMessage('Hello World!');
//发送消息,交换机用不上,所以留空。这方法没有返回值
$channel->basic_publish($msg, '', 'hello');
//用完了就关闭,释放资源
$channel->close();
$connection->close();
[*]消费者端代码
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//初始化连接
$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
//初始化信道
$channel = $connection->channel();
/*
参数1:队列名
参数2:在声明队列时指定是否启用passively模式,passively模式用于检查队列是否存在,而不是实际创建一个新队列。如果队列不存在,则会返回一个通知,而不会创建新队列。
参数3:指定队列的持久性。在这里,它是false,表示队列不是持久的。如果设置为true,则队列将在服务器重启后保留下来。
参数4:指定队列的排他性。如果设置为 true,则该队列只能被声明它的连接使用,一般用于临时队列。false表示队列不是排它的。
参数5:指定队列的自动删除,如果设置为 true,则在队列不再被使用时将自动删除。在这里,它是 false,表示队列不会自动删除。
*/
$channel->queue_declare('hello', false, false, false, false);
/*
参数1:队列名称
参数2:这是消费者标签(consumer tag),用于唯一标识消费者。在这里,它是空字符串,表示不为消费者指定任何特定的标签。
参数3:如果设置了无本地字段,则服务器将不会向发布消息的连接发送消息。
参数4:是指定是否自动确认消息(auto-ack)。设置为true则表示消费者在接收到消息后会立即确认消息,RabbitMQ将会将消息标记为已处理并从队列中删除。false表示消费者会手动确认消息,即在处理消息后,通过调用 $channel->basic_ack($deliveryTag) 手动确认消息。
参数5:指定是否独占消费者。如果设置为true,则表示只有当前连接能够使用该消费者。在这里,它是true,表示只有当前连接可以使用这个消费者。
参数6:如果设置了,服务器将不会对该方法作出响应。客户端不应等待答复方法。如果服务器无法完成该方法,它将引发通道或连接异常。
参数7:回调参数,拿到数据怎样处理。
*/
$channel->basic_consume('hello', '', false, true, false, false, function ($msg) {
echo $msg->body;
});
//通过死循环持久化当前进程,实时消费
$channel->consume();工作队列模式(Work Queues)
[*]极简概括:类比Redis的lPush、Rpop。但是RabbitMQ可以针对一个队列有多个消费者,但一条消息,只能被一个消费者消费一次,不能多次消费。为了避免消费者处理数据倾斜问题(有的队列处理任务多,有的处理的少),所以使用了轮询的方式,挨个处理任务。
[*]解决问题:耗时且不需要串行执行的任务,可以丢给队列,例如发短信、邮件、大量数据导入导出。
[*]测试普通用法
生产者代码,在cli模式下,依次输入1~10,执行10次
页:
[1]