翼度科技»论坛 编程开发 .net 查看内容

.NET6使用RabbitMQ学习

5

主题

5

帖子

15

积分

新手上路

Rank: 1

积分
15
.NET6使用RabbitMQ学习


目录

前提

前段时间上班无事,上网冲浪看到了消息队列RabbitMQ,就想着学习一下,网上看了点资料在哔哩哔哩上看的到codeman讲的一个rabbitmq的视频,就跟着仔细学习一下,敲一下代码。视频地址: rabbitmq视频
RabbitMq介绍

什么是消息队列

MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

应用场景

削峰填谷

在一个时间段很多用户同时进行请求我们的A系统,我的MQ容器就可以用来存储请求按照每秒多少的请求进行发送,减轻服务器的压力。
​       


  • 使用了MQ之后,限制消息消费的速度为3000,这样一来,高峰就被“削”掉了,但是因为消息积压,在高峰期过后一段时间内,消费消息的速度还是会维持在3000,直到消费完挤压的消息,这就叫做“填谷”。
  • 使用MQ后,可以提供系统稳定性。

异步提速


  • 在不使用MQ的情况下我们正常用户通过订单系统进行下单,我们需要900多ms,这就会出现用户的体验不好。

  • 在使用MQ的情况出现了总耗时只要25ms就给到了用户回应
    这样提升了用户体验感

所有的问题当你解决一个问题就会出现另外的问题,外部依赖多系统的稳定性就越差,MQ但凡挂了,系统就会出问题,后面就会使用mq集群来解决这一问题。
消息模型

点对点模式



在上图的模型中,有以下概念:

  • Producer:生产者,也就是要发送消息的程序
  • Consumer:消费者:消息的接受者,会一直等待消息到来。
  • Queue:消息队列。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
  • 点对点模式只会有一个消费者进行消费
代码附上


新增两个项目一个生产者 Z.RabbitMq.Producer,一个消费者Z.RabbitMQ.Consumer01

  • 项目 Z.RabbitMq.Producer新增HelloProducer类


    1. public class HelloProducer
    2.     {
    3.         public static void HelloWorldShow()
    4.         {
    5.             var factory = new ConnectionFactory();
    6.             factory.HostName = "127.0.0.1";
    7.             factory.Port = 5672;
    8.             factory.UserName = "admin";
    9.             factory.Password = "admin";
    10.             factory.VirtualHost = "my_vhost";
    11.             // 获取TCP 长连接
    12.             using (var connection = factory.CreateConnection())
    13.             {
    14.                 // 创建通信“通道”,相当于TCP中的虚拟连接
    15.                 using (var channel = connection.CreateModel())
    16.                 {
    17.                     /*
    18.                      * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
    19.                      * 第一个参数:队列名称ID
    20.                      * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
    21.                      * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用
    22.                      * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列
    23.                      * 其他额外参数为null
    24.                      */
    25.                     channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null);
    26.                     Console.ForegroundColor = ConsoleColor.Red;
    27.                     string message = "hello CodeMan 666";
    28.                     var body = Encoding.UTF8.GetBytes(message);
    29.                     /*
    30.                      * exchange:交换机,暂时用不到,在进行发布订阅时才会用到
    31.                      * 路由key
    32.                      * 额外的设置属性
    33.                      * 最后一个参数是要传递的消息字节数组
    34.                      */
    35.                     channel.BasicPublish("", RabbitConstant.QUEUE_HELLO_WORLD, null, body);
    36.                     Console.WriteLine($"producer消息:{message}已发送");
    37.                 }
    38.             }
    39.         }
    40.     }
    复制代码

  • 项目 Z.RabbitMQ.Consumer01新增HelloConsumer类
work消息模型

工作队列或者竞争消费者模式

work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
接下来我们来模拟这个流程:
P:生产者:任务的发布者
C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)
C2:消费者2:领取任务并且完成任务,假设完成速度较快
代码附上

新增一个工具类用来获取rabbitmq的连接信息
  1. public class HelloConsumer
  2.     {
  3.         public static void HelloWorldShow()
  4.         {
  5.             var factory = new ConnectionFactory();
  6.             factory.HostName = "127.0.0.1";
  7.             factory.Port = 5672;//5672是RabbitMQ默认的端口号
  8.             factory.UserName = "admin";
  9.             factory.Password = "admin";
  10.             factory.VirtualHost = "my_vhost";
  11.             using (var connection = factory.CreateConnection())
  12.             {
  13.                 using (var channel = connection.CreateModel())
  14.                 {
  15.                     /*
  16.                      * 创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
  17.                      * 第一个参数:队列名称ID
  18.                      * 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
  19.                      * 第三个参数:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用
  20.                      * 第四个:是否自动删除,false代表连接停掉后不自动删除这个队列
  21.                      * 其他额外参数为null
  22.                      */
  23.                     //RabbitConstant.QUEUE_HELLO_WORLD 对应的生产者一样名称 "helloworld.queue"
  24.                     channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null);
  25.                     Console.ForegroundColor = ConsoleColor.Cyan;
  26.                     EventingBasicConsumer consumers = new EventingBasicConsumer(channel);
  27.                     // 触发事件
  28.                     consumers.Received += (model, ea) =>
  29.                     {
  30.                         var body = ea.Body.ToArray();
  31.                         var message = Encoding.UTF8.GetString(body);
  32.                         // false只是确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
  33.                         channel.BasicAck(ea.DeliveryTag, false);
  34.                         Console.WriteLine($"Consumer01接收消息:{message}");
  35.                     };
  36.                     /*
  37.                      * 从MQ服务器中获取数据
  38.                      * 创建一个消息消费者
  39.                      * 第一个参数:队列名
  40.                      * 第二个参数:是否自动确认收到消息,false代表手动确认消息,这是MQ推荐的做法
  41.                      * 第三个参数:要传入的IBasicConsumer接口
  42.                      *
  43.                      */
  44.                     
  45.                     //RabbitConstant.QUEUE_HELLO_WORLD ==  helloworld.queue
  46.                     channel.BasicConsume(RabbitConstant.QUEUE_HELLO_WORLD, false, consumers);
  47.                     Console.WriteLine("Press [Enter] to exit");
  48.                     Console.Read();
  49.                 }
  50.             }
  51.         }
  52.     }
复制代码

  • 消费者1(C1)在刚刚的 Z.RabbitMQ.Consumer01新增SmsReceive类
    在Program.cs中的main函数中进行调用 SmsReceive.Sender();
    消费者1 延迟30ms接受到信息
    1. public class RabbitUtils
    2. {
    3.     public static ConnectionFactory GetConnection()
    4.     {
    5.         var factory = new ConnectionFactory();
    6.         factory.HostName = "127.0.0.1";
    7.         factory.Port = 5672;//5672是RabbitMQ默认的端口号
    8.         factory.UserName = "admin";
    9.         factory.Password = "admin";
    10.         factory.VirtualHost = "my_vhost";
    11.         return factory;
    12.     }
    13. }
    复制代码
  • 消费者2(C2)在刚刚的 Z.RabbitMQ.Consumer02新增SmsReceive类

    消费者1 延迟60ms接受到信息
    1. public class SmsReceive
    2. {
    3.   public static void Sender()
    4.   {
    5.       //使用工具类创建连接
    6.       var connection = RabbitUtils.GetConnection().CreateConnection();
    7.       var channel = connection.CreateModel();
    8.       channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
    9.       // 如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
    10.       // basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
    11.       channel.BasicQos(0, 1, false);
    12.       var consumer = new EventingBasicConsumer(channel);
    13.       consumer.Received += (model, ea) =>
    14.       {
    15.           var body = ea.Body.ToArray();
    16.           var message = Encoding.UTF8.GetString(body);
    17.           Thread.Sleep(30);
    18.           Console.WriteLine($"SmsSender-发送短信成功:{message}");
    19.           channel.BasicAck(ea.DeliveryTag, false);
    20.       };
    21.       channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer);
    22.       Console.WriteLine("Press [Enter] to exit");
    23.       Console.Read();
    24.   }
    25. }
    复制代码
  • 生产者Z.RabbitMq.Producer中创建SmsSender类在main函数进行调用

    • 发送100条车票订阅的消息
    1. public class SmsReceive
    2. {
    3.     public static void Sender()
    4.     {
    5.         var connection = RabbitUtils.GetConnection().CreateConnection();
    6.         var channel = connection.CreateModel();
    7.         channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
    8.         // 如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
    9.         // basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
    10.         channel.BasicQos(0, 1, false);//处理完一个取一个
    11.         var consumer = new EventingBasicConsumer(channel);
    12.         consumer.Received += (model, ea) =>
    13.         {
    14.             var body = ea.Body.ToArray();
    15.             var message = Encoding.UTF8.GetString(body);
    16.             Thread.Sleep(60);
    17.             Console.WriteLine($"SmsSender-发送短信成功:{message}");
    18.             channel.BasicAck(ea.DeliveryTag, false);
    19.         };
    20.         channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer);
    21.         Console.WriteLine("Press [Enter] to exit");
    22.         Console.Read();
    23.     }
    24. }
    复制代码
运行结构如下

能者多劳


  • 消费者1比消费者2的效率要快,一次任务的耗时较短
  • 消费者2大量时间处于空闲状态,消费者1一直忙碌
通过channel.BasicAck(ea.DeliveryTag, false);来完成能者多劳的效果,在完成上一次请求之后再去取下一条消息,这就会出现服务器快的消费的更多,慢的消费的更少。
发布订阅模式

Publish/subscribe(交换机类型:Fanout,也称为广播 )


和前面两种模式不同:

  • 声明Exchange,不再声明Queue
  • 发送消息到Exchange,不再发送到Queue,通过exchange发送到queue上
消费者1收到的天气

项目.RabbitMq.Consumer01 创建WeatherFanout使用exchange(交换机)
  1. public class SmsSender
  2. {
  3.     public static void Sender()
  4.     {
  5.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.         {
  7.             using (var channel = connection.CreateModel())
  8.             {
  9.                 channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
  10.                 for (int i = 0; i < 100; i++)
  11.                 {
  12.                     Sms sms = new Sms("乘客" + i, "139000000" + i, "您的车票已预定成功");
  13.                     string jsonSms = JsonConvert.SerializeObject(sms);
  14.                     var body = Encoding.UTF8.GetBytes(jsonSms);
  15.                     channel.BasicPublish("", RabbitConstant.QUEUE_SMS, null, body);
  16.                     Console.WriteLine($"正在发送内容:{jsonSms}");
  17.                 }
  18.                 Console.WriteLine("发送数据成功");
  19.             }
  20.         }
  21.     }
  22. }
复制代码
消费者2收到的天气

项目.RabbitMq.Consumer02 创建WeatherFanout使用exchange(交换机)
代码与消费者01一样
生产者发送天气

生产者把消息推送到交换机上
  1. public class WeatherFanout
  2. {
  3.     public static void Weather()
  4.     {
  5.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.         {
  7.             using (var channel = connection.CreateModel())
  8.             {
  9.                 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout);
  10.                 // 声明队列信息
  11.                 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
  12.                 /*
  13.                          * queueBind 用于将队列与交换机绑定
  14.                          * 参数1:队列名
  15.                          * 参数2:交换机名
  16.                          * 参数3:路由Key(暂时用不到)
  17.                          */
  18.                 channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
  19.                 channel.BasicQos(0, 1, false);
  20.                 var consumer = new EventingBasicConsumer(channel);
  21.                 consumer.Received += ((model, ea) =>
  22.                                       {
  23.                                           var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  24.                                           Console.WriteLine($"百度收到的气象信息:{message}");
  25.                                           channel.BasicAck(ea.DeliveryTag, false);
  26.                                       });
  27.                 channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
  28.                 Console.WriteLine("Press [Enter] to exit");
  29.                 Console.Read();
  30.             }
  31.         }
  32.     }
  33. }
复制代码
最后得到效果

Routing 路由模型


P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

  • 队列与交换机的绑定,不能是任意绑定,而是要指定一个RoutingKey
  • 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收消息
生产者
  1. public class WeatherFanout
  2. {
  3.     public static void Weather()
  4.     {
  5.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.         {
  7.             using (var channel = connection.CreateModel())
  8.             {
  9.                 string message = "20度";
  10.                 var body = Encoding.UTF8.GetBytes(message);
  11.                 channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, body);
  12.                 Console.WriteLine("天气信息发送成功!");
  13.             }
  14.         }
  15.     }
  16. }
复制代码
消费者1

接受百度路由的路由消息
  1. public class WeatherDirect
  2. {
  3.      public static void Weather()
  4.      {
  5.          Dictionary<string, string> area = new Dictionary<string, string>();
  6.          area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据");
  7.          area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据");
  8.          area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据");
  9.          area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据");
  10.          using (var connection = RabbitUtils.GetConnection().CreateConnection())
  11.          {
  12.              using (var channel = connection.CreateModel())
  13.              {
  14.                  foreach (var item in area)
  15.                  {
  16.                      channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key,
  17.                                           null, Encoding.UTF8.GetBytes(item.Value));
  18.                  }
  19.                  Console.WriteLine("气象信息发送成功!");
  20.              }
  21.          }
  22.      }
  23. }
复制代码
消费者2

接受新浪的路由信息
  1. public class WeatherDirect
  2. {
  3.     public static void Weather()
  4.     {
  5.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.         {
  7.             using (var channel = connection.CreateModel())
  8.             {
  9.                 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
  10.                 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
  11.                 /*
  12.                     * queueBind 用于将队列与交换机绑定
  13.                     * 参数1:队列名
  14.                     * 参数2:交换机名
  15.                     * 参数3:路由Key(暂时用不到)
  16.                     */
  17.                 channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20210525");
  18.                 channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");
  19.                 channel.BasicQos(0, 1, false);
  20.                 var consumer = new EventingBasicConsumer(channel);
  21.                 consumer.Received += ((model, ea) =>
  22.                                       {
  23.                                           var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  24.                                           Console.WriteLine($"百度收到的气象信息:{message}");
  25.                                           channel.BasicAck(ea.DeliveryTag, false);
  26.                                       });
  27.                 channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
  28.                 Console.WriteLine("Press [Enter] to exit");
  29.                 Console.Read();
  30.             }
  31.         }
  32.     }
  33. }
复制代码
最后得到的效果


  • 新浪接收对应新浪的routingkey的信息
  • 百度接收对应百度的routingkey的信息

Topics 通配符模式


routingkey支持通配符匹配格式


  • 通配符格式

    • Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过
    • Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符
    • RoutingKey一般都是由一个或多个单词组成,多个单词之间以“.”分隔,例如:item.insert
    • 通配符规则:#匹配一个或多个词,*恰好匹配一个词,例如item.#能够匹配item.insert.user或者item.insert,item.只能匹配item.insert或者item.user

生产者
  1. public class WeatherDirect
  2. {
  3.     public static void Weather()
  4.     {
  5.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.         {
  7.             using (var channel = connection.CreateModel())
  8.             {
  9.                 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
  10.                 // 声明队列信息
  11.                 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
  12.                 /*
  13.                      * queueBind 用于将队列与交换机绑定
  14.                      * 参数1:队列名
  15.                      * 参数2:交换机名
  16.                      * 参数3:路由Key
  17.                      */
  18.                 channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.xiangyang.20210525");
  19.                 channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20210525");
  20.                 channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");
  21.                 channel.BasicQos(0, 1, false);
  22.                 var consumer = new EventingBasicConsumer(channel);
  23.                 consumer.Received += ((model, ea) =>
  24.                                       {
  25.                                           var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  26.                                           Console.WriteLine($"新浪收到的气象信息:{message}");
  27.                                           channel.BasicAck(ea.DeliveryTag, false);
  28.                                       });
  29.                 channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
  30.                 Console.WriteLine("Press [Enter] to exit");
  31.                 Console.Read();
  32.             }
  33.         }
  34.     }
  35. }
复制代码
消费者1

获取交换机中通配符为china.#的信息

  • ("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据");
  • ("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据");
  • ("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据");
  1. public class WeatherTopic
  2. {
  3.     public static void Weather()
  4.     {
  5.         Dictionary<string, string> area = new Dictionary<string, string>();
  6.         area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据");
  7.         area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据");
  8.         area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据");
  9.         area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据");
  10.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  11.         {
  12.             using (var channel = connection.CreateModel())
  13.             {
  14.                 foreach (var item in area)
  15.                 {
  16.                     channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key,
  17.                                          null, Encoding.UTF8.GetBytes(item.Value));
  18.                 }
  19.                 Console.WriteLine("气象信息发送成功!");
  20.             }
  21.         }
  22.     }
  23. }
复制代码
消费者2

获取交换机中通配符为china.hubei.*.20210525的信息

  • ("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据")
  • ("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据")
  1. public class WeatherTopic
  2. {
  3.     public static void Weather()
  4.     {
  5.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.         {
  7.             using (var channel = connection.CreateModel())
  8.             {
  9.                 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);
  10.                 // 声明队列信息
  11.                 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
  12.                 /*
  13.                      * queueBind 用于将队列与交换机绑定
  14.                      * 参数1:队列名
  15.                      * 参数2:交换机名
  16.                      * 参数3:路由Key(暂时用不到)
  17.                      */
  18.                 channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#");
  19.                 channel.BasicQos(0, 1, false);
  20.                 var consumer = new EventingBasicConsumer(channel);
  21.                 consumer.Received += ((model, ea) =>
  22.                                       {
  23.                                           var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  24.                                           Console.WriteLine($"百度收到的气象信息:{message}");
  25.                                           channel.BasicAck(ea.DeliveryTag, false);
  26.                                       });
  27.                 channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
  28.                 Console.WriteLine("Press [Enter] to exit");
  29.                 Console.Read();
  30.             }
  31.         }
  32.     }
  33. }
复制代码
最后得到的效果


  • 百度获取china.#的信息
  • 新浪获取china.hubei.*.20210525的信息

RPC


基本概念:


  • Callback queue 回调队列,客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。
  • Correlation id 关联标识,客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。
流程说明:


  • 当客户端启动的时候,它创建一个匿名独享的回调队列。
  • 在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
  • 将请求发送到一个 rpc_queue 队列中。
  • 服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
  • 客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用
分享几题面试题

RabbitMQ中消息可能有的几种状态?


  • alpha: 消息内容(包括消息体、属性和 headers) 和消息索引都存储在内存中 。

    • beta: 消息内容保存在磁盘中,消息索引保存在内存中。
    • gamma: 消息内容保存在磁盘中,消息索引在磁盘和内存中都有 。
    • delta: 消息内容和索引都在磁盘中 。

  • 死信队列?
    DLX,全称为 Dead-Letter-Exchange,死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead  message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之 为死信队列。
  • 导致的死信的几种原因?

    • 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。
    • 消息TTL过期。
    • 队列满了

到这里就结束,大家如果需要看视频学习就是点最上面的链接就行了
想要源码的可以加QQ群831181779 @做梦达人

来源:https://www.cnblogs.com/fantasy-ke/archive/2023/07/14/17555153.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具