翼度科技»论坛 编程开发 PHP 查看内容

4.3万字详解PHP+RabbitMQ(AMQP协议、通讯架构、6大模式、交换机队列消息持

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
理论(后半部分有实操详解)

哲学思考


  • 易经思维:向各国人讲述一种动物叫乌龟,要学很久的各国语言,但是随手画一个乌龟,全世界的人都能看得懂。
  • 道家思维:努力没有用(指劳神费心的机械性重复、肢体受累、刻意行为),要用心(深度思考、去感悟、透过现象看本质)才有用。
  • 举例:类似中学做不出来的几何题的底层原理:不是不知道xx定理或公式(招式),而是不知道画辅助线的思路(内功)。
  • 总结:万事万物、用道家思维思考本质与规律,用易经思维从众多信息中简化模型练出来的内功,叫觉悟(内力),而知识仅仅是外功(招式)。
做研发的,可能距离成功一步之遥,别因为一叶障目而放弃。
消息队列与消息中间件


  • 消息队列:一种存储消息的数据结构,消息队列通常是一种先进先出(FIFO)的数据结构,可以确保消息的顺序传递。类比Redis的List,先进先出类比lPush和rPop。
  • 消息中间件:是管理消息传递的一种组件。功能包含消息队列。
  • 中间件:中间件是指位于客户端(或调用端)和服务端之间的组件,用于协调、存储、接口、管理组件之间的通信,类比卖家与买家之间的快递驿站,不是一定要用,但是最好得有。
AMQP协议

AMQP(Advanced Message Queuing Protocol),是一种用于消息传递协议,类比成HTTP、TCP、UDP这种不同的协议就行。它定义了消息中间件客户端与服务端(买家买家对驿站的沟通)的通信规则(怎么运快递),包括消息格式(什么类型的快递)、消息发布(怎么发快递)、消息订阅(怎么收快递)、队列管理(怎么处理快递)等。
AMQP高效就高效在把通信(物流阶段)能遇到的问题都解决了。

  • 异步通信:AMQP支持异步通信,可以让消息发送者和接收者在不同的时间和速度进行消息传递。
  • 消息路由:AMQP协议定义了灵活的消息路由规则,可以根据消息内容自动将消息路由到指定的接收者。
  • 消息确认:AMQP支持消息确认机制,确保消息被可靠地接收。
  • 批量处理:AMQP协议支持批量消息处理,可以同时发送和接收多个消息,减少了网络通信的开销和系统的资源消耗。
RabbitMQ


  • 官方文档:https://www.rabbitmq.com/docs
  • 极简概括:使用ErLang语言写基于AMQP协议的C/S架构的消息中间件,用于不同组件之间高效的传递数据。
  • 解决问题:

    • 削峰:卖家一家伙运了几十吨的货(海量请求)、买家没地方存、扛不住了,那就放驿站缓冲一下。
    • 解耦:没有驿站中转,快递送过来硬塞给买家,买家不在,这个流程就走不下去(耦合)。
    • 异步:卖家只要发货、流程基本走完,剩下的流程交给物流和驿站(中间件),不影响卖家做其它事(非阻塞),买家也一样、快递放驿站、我忙我的。需要时再取(按需订阅)。
    • 卷:面试加分项,工作用到减轻负担。只想卷死各位,或者被各位卷死。

  • 适用场景

    • 流量削峰:当系统抗不出海量的请求的时候,把MQ放置在用户端与业务端之间(强制排队)削去部分峰值流量(Nginx令牌桶和滴水算法、或基于Redis也能实现),这个过程和加锁的缺点差不多,性能会受影响。
    • 应用解耦:小型项目用不上,大型项目中,库存、订单、支付、物流等各种模块,为了防止硬关联耦合度大,一节点挂掉其余瘫痪,所以用MQ作为通信的桥梁。
    • 异步处理:用的最多,异步发短信,发邮件、导出导出文件、延时任务、自动取消订单、推送通知、回调重试等。
    • 助力跳槽涨薪:只想卷死各位,或者被各位卷死。
    • 赋能成就感:RabbitMQ都用上了,感觉离架构师不远了(呵呵)。

  • 优点:

    • 应用场景就是优点,主流。
    • 丰富的客户端支持,支持PHP、Java、Python、Ruby、C、Golang、NodeJS、.NET、Objective-C、Swift。

  • 缺点:

    • 性能下降:流量削峰引起的性能下降。
    • 运维成本:多引入一个组件,就要考虑它的运维成本,以及各项配置问题。
    • 大数据处理:对于大数据、流数据、日志分析、更适合Kafka,RabbitMQ性能会下降。
    • 安装困难:成熟的软件安装不方便,Erlang环境依赖强,其次是官网还把CentOS7的安装去掉了。

  • 同类产品:RocketMQ(仅支持Java和C++)、ActiveMQ(后期停止维护)、Kafka(大数据场景)、基于Redis实现的任务队列(轻量级使用)、编程语言框架提供的支持(Laravel Queue)、AWSMQ(亚马逊)、ApsaraMQ(阿里巴巴)、PulsarMQ(Apache)。
工作架构图



  • Producer:生产者,类比卖家。
  • Connection:客户端与服务端的通信层。
  • Channel:这里叫信道,类比要发那个快递,或从那家快递取货。
  • Broker:用于接受和分发消息的处理,这是MQ Server要干的事,类比驿站。
  • Exchange:交换机(不是switch),指定规则路由到那个队列,类比分配到那个货架的方法。
  • Consumer:消费者,类比买家。
  • 补充:每次访问RabbitMQ都需要建立一个连接,海量的请求对于这块的开销是绝大的,所以需要在channel与connection内部建立逻辑连接,从而减少性能损耗。
  • 多租户设计:每个Block里面可以多个Vhost,Vhost里面,可以有多个Exchange(交换机),每个Exchange可以有多个Queue。
四大概念的通俗理解


  • 生产者:顾名思义生产数据的角色,类比卖家发快递。
  • 消费者:生产者产出的数据,给消费者使用,类比买家收快递。
  • 交换机:,用于接受生产者的消息,通过指定模式(4大模式)和路由键(交换机与队列绑定标识符)分发给队列,一个交换机可绑定多个队列,将消息路由到多个队列,类比快递驿站的分发到那个货架。
  • 队列:一种数据结构,存放消息队列,类比快递驿站的货架。
  • 流程:生产者(发快递)->交换机(驿站怎么分类)->队列(驿站怎么存)->消费者(拿快递)。
Exchange(交换机)


  • 极简概括:位于生产者和队列之间,用于接受生产者的消息,并分发给队列,一个交换机可绑定多个队列,将消息路由到多个队列。
  • 解决问题:生产者发一条消息,让所有或指定消费者能够收到(类似广播)。如果没有交换机机制,只会有一个消费者能收到此消息。
  • 交换机4大类型:直接(direct)类型、主题(topic)类型、头(headers)类型、扇出(fanout)类型(下文有详解)。
  • 补充:在常规模式,工作队列模式(生产者端,basic_publish方法参数2为空字符串),也有一个默认类型交换机,或称之为无名Exchange。
Routing Key(路由键)


  • 极简概括:交换机绑定队列的标识符,一个交换机,可以有多个Routing key。
  • 解决问题:起个名用于区分,方便对不同队列进行不同的操作,就像MySQL表id作用一样。
死信

无法被消费的消息。
死信队列


  • 极简概括:队列中的消息无法被消费,若没有后续的处理,就成了死信队列。
  • 解决问题:将消费异常的数据放入死信队列,用于存储消费失败或者异常时的情况,确保失败的消息能够得到适当的处理(重试或由开发者调试查看用)。可以简单的理解为找个地方存失败的消费任务。也可将计就计,利用某些特性作为延时队列使用。
  • 产生原因:

    • 消息TTL过期。
    • 队列满了。
    • 消息被拒绝(basic reject 或basic nack),并且requeue为false。

有Redis List去实现消息队列,为什么要RabbitMQ?


  • 持久化问题:RabbitMQ支持持久化,Redis虽然也支持持久化,但只要不是每次操作都持久化,那么就有丢失数据的风险。
  • 消息应答问题:消息处理成功与失败,Redis用队列无法记录,任务消息只会取一个少一个,而RabbitMQ可以。
  • 故障转移问题:Redis哨兵机制、主从复制,是针对缓存高可用,做消息中间件有局限性。RabbitMQ支持消息重新入队。如果某个消费者由于某些原因失去连接,导致消息未发送ACK确认,那么RabbitMQ有让消息重新排队的机制,如果此时其它消费者可以处理,那就让其它消费者处理。
  • 支持消息优先级:RabbitMQ支持消息优先级,而Redis不支持。
  • 广播支持:RabbitMQ支持广播,等指定队列发送,而Redis不支持。
  • 路由转发:RabbitMQ通过交换机机制,支持设定不同的分发队列规则,满足各个场景,而Redis List需要手动实现这块内部机制。
有Redis Sorted Set、或者过期监听去实现延时队列,为什么要RabbitMQ?

RabbitMQ是推模式还是拉模式?

都有,生产者发数据到MQ是推,消费者消费消息是拉。
通信方案选择Push还是Pull?

推或拉是两种通信的方向选择,跟MQ无关,但是类似MQ,顺便提一下。
个人认为:

  • 看业务场景,抛开业务场景谈架构都是耍流氓

    • 要求实时性的,就选择推送,轮询耗费网络资源,调用端或客户端每次请求,服务端都得执行一次,尤其是并发量大或者响应流程任务重的场景。
    • 不需要实时性的,就拉取,减低耦合,服务端就纯粹的产生服务端的数据就行,客户端或调用端谁想拉取让它自己来。

  • 看改动开销

    • 保证工程质量的前提下,那种方式开销小,技术老大说用哪个,或者同事们习惯那一种,就用哪一种。

  • 看调用端可信任性。

    • 各种鉴权,验证是一方面,数据传输也是一方面,对于不信任的平台,白推送的数据,与对方直接请求获取。还是有区别的。

RabbitMQ可以直连队列吗?

RabbitMQ内部不可以直连队列,但是操作上可以直连队列。
就算是常规(Hello World)模式,没有声明交换机,也会经过一个默认交换机。
不过这样丧失了交换机灵活的路由分发功能,适用于简单的场景。
实操

安装

Docker安装
  1. docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
  2. 浏览器访问:http://192.168.xxx.xxx:15672
复制代码
普通安装
  1. CentOS7的安装RabbitMQ的教程,已经被官网删除了,支持CentOS8,CentOS需要借助外力。
  2. 安装Erlang
  3. curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
  4. yum -y install erlang
  5. 安装RabbitMQ
  6. curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
  7. yum install rabbitmq-server
  8. 开启服务,并设置开机自启
  9. systemctl start rabbitmq-server
  10. systemctl enable rabbitmq-server
  11. 检查状态
  12. systemctl status rabbitmq-server
  13. 启动网页端控制台
  14. rabbitmq-plugins enable rabbitmq_management
  15. 开启防火墙
  16. firewall-cmd --zone=public --add-port=80/tcp --permanent
  17. systemctl restart firewalld
  18. 新建网页端登录用户,并配置角色与权限
  19. rabbitmqctl add_user admin 12345678
  20. rabbitmqctl set_user_tags admin administrator
  21. rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
  22. rabbitmqctl set_permissions -p <virtual_host> <username> <configure_permission> <write_permission> <read_permission>
  23. -p <virtual_host>:指定虚拟主机的名称,例如/(默认虚拟主机)。
  24. <username>:要为其设置权限的用户名。
  25. <configure_permission>:配置权限的正则表达式。允许用户对队列、交换机等进行配置。
  26. <write_permission>:写权限的正则表达式。允许用户发布消息。
  27. <read_permission>:读权限的正则表达式。允许用户获取消息。
  28. 浏览器访问
  29. http://192.168.xxx.xxx:15672
  30. 用户名admin,密码12345678
复制代码
命令行常用命令
  1. systemctl start/stop/restart/status/enable rabbitmq-server # RabbitMQ Server开启、关停、重启、状态查看、开机自启
  2. rabbitmq-plugins enable 插件名          # RabbitMQ Server安装插件
  3. rabbitmq-plugins list                   # 插件列表
  4. rabbitmqctl version                     # 查看RabbitMQ Server版本
  5. rabbitmqctl list_exchanges              # 查看交换机列表
  6. rabbitmqctl list_queues                 # 查看队列列表
  7. rabbitmqctl list_bindings               # 查看绑定列表
复制代码
PHP实现RabbitMQ Client


  • RabbitMQ6大模式官方教程:https://www.rabbitmq.com/tutorials
  • 官方扩展(不用):https://pecl.php.net/package/amqp/1.11.0/windows
    这个扩展官方下载地址有最后一版Windows系统的php_amqp.dll的下载地址,(用Windows是为了方便,在CentOS上还需要编译,改完PHP代码每次需要重新上传,不想费事),但是我使用报错,所以废弃了。
  • 官方推荐:composer require php-amqplib/php-amqplib
    PHP操作RabbitMQ思路并不复杂,有6种工作模式,翻来覆去就是研究消息怎么发,发到哪里,怎么处理消息的问题。
  • Laravel框架:可以使用php-amqplib/php-amqplib,去操作,也可以使用现成的laravel-queue-rabbitmq去操作。composer require vladimir-yuldashev/laravel-queue-rabbitmq
消费者常驻进程偏好
  1. while (count($channel->callbacks)) {
  2.     $channel->wait();
  3. }
  4. //或者
  5. $channel->consume();
复制代码
常规模式(Hello World!)


  • 极简概括:生产者生产消息给消费者。
  • 解决问题:跨进程,或跨组件、跨网络通信,适用与两个角色,但是一个PHP进程无法完成的时候用。
  • 备注:用MySQL、Redis也能实现。
  • 生产者端代码:
  1. require_once __DIR__ . '/vendor/autoload.php';
  2. use PhpAmqpLib\Connection\AMQPStreamConnection;
  3. use PhpAmqpLib\Message\AMQPMessage;
  4. //初始化连接
  5. $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
  6. //初始化信道
  7. $channel = $connection->channel();
  8. /*
  9. 参数1:队列名
  10. 参数2:在声明队列时指定是否启用passively模式,passively模式用于检查队列是否存在,而不是实际创建一个新队列。如果队列不存在,则会返回一个通知,而不会创建新队列。
  11. 参数3:指定队列的持久性。在这里,它是false,表示队列不是持久的。如果设置为true,则队列将在服务器重启时或宕机后保留下来。
  12. 参数4:指定队列的排他性。如果设置为 true,则该队列只能被声明它的连接使用,一般用于临时队列。false表示队列不是排它的。
  13. 参数5:指定队列的自动删除,如果设置为 true,则在队列不再被使用时将自动删除。在这里,它是 false,表示队列不会自动删除。
  14. */
  15. $channel->queue_declare('hello', false, false, false, false);
  16. //编辑消息
  17. $msg = new AMQPMessage('Hello World!');
  18. //发送消息,交换机用不上,所以留空。这方法没有返回值
  19. $channel->basic_publish($msg, '', 'hello');
  20. //用完了就关闭,释放资源
  21. $channel->close();
  22. $connection->close();
复制代码

  • 消费者端代码
  1. require_once __DIR__ . '/vendor/autoload.php';
  2. use PhpAmqpLib\Connection\AMQPStreamConnection;
  3. use PhpAmqpLib\Message\AMQPMessage;
  4. //初始化连接
  5. $connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
  6. //初始化信道
  7. $channel = $connection->channel();
  8. /*
  9. 参数1:队列名
  10. 参数2:在声明队列时指定是否启用passively模式,passively模式用于检查队列是否存在,而不是实际创建一个新队列。如果队列不存在,则会返回一个通知,而不会创建新队列。
  11. 参数3:指定队列的持久性。在这里,它是false,表示队列不是持久的。如果设置为true,则队列将在服务器重启后保留下来。
  12. 参数4:指定队列的排他性。如果设置为 true,则该队列只能被声明它的连接使用,一般用于临时队列。false表示队列不是排它的。
  13. 参数5:指定队列的自动删除,如果设置为 true,则在队列不再被使用时将自动删除。在这里,它是 false,表示队列不会自动删除。
  14. */
  15. $channel->queue_declare('hello', false, false, false, false);
  16. /*
  17. 参数1:队列名称
  18. 参数2:这是消费者标签(consumer tag),用于唯一标识消费者。在这里,它是空字符串,表示不为消费者指定任何特定的标签。
  19. 参数3:如果设置了无本地字段,则服务器将不会向发布消息的连接发送消息。
  20. 参数4:是指定是否自动确认消息(auto-ack)。设置为true则表示消费者在接收到消息后会立即确认消息,RabbitMQ将会将消息标记为已处理并从队列中删除。false表示消费者会手动确认消息,即在处理消息后,通过调用 $channel->basic_ack($deliveryTag) 手动确认消息。
  21. 参数5:指定是否独占消费者。如果设置为true,则表示只有当前连接能够使用该消费者。在这里,它是true,表示只有当前连接可以使用这个消费者。
  22. 参数6:如果设置了,服务器将不会对该方法作出响应。客户端不应等待答复方法。如果服务器无法完成该方法,它将引发通道或连接异常。
  23. 参数7:回调参数,拿到数据怎样处理。
  24. */
  25. $channel->basic_consume('hello', '', false, true, false, false, function ($msg) {
  26.     echo $msg->body;
  27. });
  28. //通过死循环持久化当前进程,实时消费
  29. $channel->consume();
复制代码
工作队列模式(Work Queues)


  • 极简概括:类比Redis的lPush、Rpop。但是RabbitMQ可以针对一个队列有多个消费者,但一条消息,只能被一个消费者消费一次,不能多次消费。为了避免消费者处理数据倾斜问题(有的队列处理任务多,有的处理的少),所以使用了轮询的方式,挨个处理任务。
  • 解决问题:耗时且不需要串行执行的任务,可以丢给队列,例如发短信、邮件、大量数据导入导出。
  • 测试普通用法
生产者代码,在cli模式下,依次输入1~10,执行10次
[code]

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具