翼度科技»论坛 编程开发 .net 查看内容

.NET中使用RabbitMQ延时队列和死信队列

5

主题

5

帖子

15

积分

新手上路

Rank: 1

积分
15
RabbitMQ延时队列和死信队列

延时队列和死信队列

延时队列是RabbitMQ中的一种特殊队列,它可以在消息到达队列后延迟一段时间再被消费。
延时队列的实现原理是通过使用消息的过期时间和死信队列来实现。当消息被发送到延时队列时,可以为消息设置一个过期时间,这个过期时间决定了消息在延时队列中等待的时间。如果消息在过期时间内没有被消费者消费,则会被自动发送到一个预先指定的死信队列中。
在RabbitMQ中,延时队列的实现可以通过以下步骤来完成:

  • 创建一个普通的队列作为延时队列,设置x-message-ttl参数为消息的过期时间。
  • 创建一个死信队列,用于接收延时队列中过期的消息。
  • 将延时队列设置为普通队列的死信交换机,并指定死信路由键。
  • 将消费者绑定到死信队列,以消费延时队列中过期的消息。
使用场景


  • 订单在十分钟内未支付则自动取消。
  • 新创建的店铺,如果十天内都没有上传过商品,则自动发送消息提醒。
  • 账单在一周内未支付,则自动结算。
  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  • 还有很多场景就不一一例举了。
TTL设置

方式一:

创建队列时设置x-message-ttl的属性,所有被投递到该队列的消息最多都不会超过60s
  1. var args = new Dictionary<string,object>();
  2. args.Add("x-message-ttl",60000); //单位为毫秒
  3. model.QueueDeclare("myqueue",false,false,false,args);
复制代码
方式二:

为每条消息设置TTL,为每条消息设置过期时间。
  1. IBasicProperties props = model.CreateBasicProperties();
  2. props.ContentType = "text/plain";
  3. props.DeliveryMode = 2;
  4. props.Expiration = "60000"
  5. model.BasicPublic(exchangeName,routingKey,props,messageBodyBytes);
复制代码
代码实践

模拟支付业务
整个项目由三部分组成

  • Web API项目:用于发送订单请求,生产者。
  • 控制台项目一:用于处理订单支付,延时队列。
  • 控制台项目二:用于处理超时未支付的订单,死信队列。
Web API项目

订单类,就简单的写一个用于演示,真实业务肯定不是这样~
  1. public class OrderDto
  2. {
  3.     /// <summary>
  4.     /// 订单名称
  5.     /// </summary>
  6.     public string Name { get; set; }
  7.     /// <summary>
  8.     /// 订单状态
  9.     /// 0 未支付
  10.     /// 1 已支付
  11.     /// 2 超时
  12.     /// </summary>
  13.     public int Status { get; set; }
  14. }
复制代码
控制器
  1. [ApiController]
  2. [Route("api/orders")]
  3. public class OrdersController : ControllerBase
  4. {
  5.     private readonly IOrderService _orderService;
  6.     public OrdersController(IOrderService orderService)
  7.     {
  8.         _orderService = orderService;
  9.     }
  10.     [HttpPost]
  11.     public IActionResult CreateOrder([FromBody] OrderDto orderDto)
  12.     {
  13.         // 处理订单逻辑
  14.         _orderService.ProcessOrder(orderDto);
  15.         return Ok();
  16.     }
  17. }
复制代码
订单服务
  1. public interface IOrderService
  2. {
  3.     void ProcessOrder(OrderDto orderDto);
  4. }
  5. public class OrderService : IOrderService
  6. {
  7.     private readonly RabbitMQConnectionFactory _connectionFactory;
  8.    
  9.     public OrderService(RabbitMQConnectionFactory connectionFactory)
  10.     {
  11.         _connectionFactory = connectionFactory;
  12.     }
  13.     public void ProcessOrder(OrderDto orderDto)
  14.     {
  15.         using (var channel = _connectionFactory.CreateChannel())
  16.         {
  17.             var properties = channel.CreateBasicProperties();
  18.             properties.Headers = new Dictionary<string, object>
  19.             {
  20.                 { "x-delay", 1000 * 20  } // 设置20秒延时
  21.             };
  22.             var message = JsonConvert.SerializeObject(orderDto);
  23.             var body = Encoding.UTF8.GetBytes(message);
  24.             channel.BasicPublish("delayed_exchange", "routing_key", properties, body);
  25.         }
  26.     }
  27. }
复制代码
支付处理项目

ProcessPay类,用于接收订单消息
  1. public class ProcessPay : IHostedService
  2. {
  3.     private readonly ConnectionFactory _factory;
  4.     private IConnection _connection;
  5.     private IModel _channel;
  6.     public ProcessPay()
  7.     {
  8.         _factory = new ConnectionFactory()
  9.         {
  10.             HostName = "ip",
  11.             Port = 5672,
  12.             UserName = "用户名",
  13.             Password = "密码"
  14.         };
  15.     }
  16.     public Task StartAsync(CancellationToken cancellationToken)
  17.     {
  18.         Console.WriteLine(" Press [enter] to exit.");
  19.         _connection = _factory.CreateConnection();
  20.         _channel = _connection.CreateModel();
  21.         _channel.ExchangeDeclare("delayed_exchange", ExchangeType.Direct, true, false, null);
  22.         //关键代码,绑定死信交换机
  23.         var arguments = new Dictionary<string, object>
  24.         {
  25.             { "x-dead-letter-exchange", "dead_letter_exchange" },
  26.             { "x-dead-letter-routing-key", "dead_letter_routing_key" }
  27.         };
  28.         _channel.QueueDeclare("delayed_queue", true, false, false, arguments);
  29.         _channel.QueueBind("delayed_queue", "delayed_exchange", "routing_key");
  30.         var consumer = new EventingBasicConsumer(_channel);
  31.         consumer.Received += (model, ea) =>
  32.         {
  33.             var body = ea.Body.ToArray();
  34.             var message = Encoding.UTF8.GetString(body);
  35.             // 处理支付逻辑
  36.             var orderDto = JsonConvert.DeserializeObject<OrderDto>(message);
  37.             Console.WriteLine($"订单信息:{orderDto.Name}");
  38.             Console.WriteLine("请输入价格(模拟支付):");
  39.             // 超时未支付
  40.             string? many = "";
  41.             // 支付处理
  42.             Console.WriteLine("请输入:");
  43.             // 超时未支付进行处理
  44.             Task.Factory.StartNew(() =>
  45.             {
  46.                 many = Console.ReadLine();
  47.                 Console.WriteLine($"many:{many}");
  48.             }).Wait(20 * 1000);
  49.             if (string.Equals(many, "100"))
  50.             {
  51.                 orderDto.Status = 1;
  52.                 Console.WriteLine("支付完成");
  53.                 _channel.BasicAck(ea.DeliveryTag, true);
  54.             }
  55.             else
  56.             {
  57.                 //重试几次依然失败
  58.                 Console.WriteLine("等待一定时间内失效超时未支付的订单");
  59.                 _channel.BasicNack(ea.DeliveryTag, false, false);
  60.             }
  61.         };
  62.         _channel.BasicConsume("delayed_queue", false, consumer);
  63.         return Task.CompletedTask;
  64.     }
  65.     public Task StopAsync(CancellationToken cancellationToken)
  66.     {
  67.         _channel?.Close();
  68.         _connection?.Close();
  69.         _channel?.Dispose();
  70.         _connection?.Dispose();
  71.         return Task.CompletedTask;
  72.     }
  73. }
复制代码
在Main方法中使用单例模式注册该服务,当然直接将代码写在Main方法也是没有问题的,只不过这种方式方便管理。
  1. static void Main(string[] args)
  2. {
  3.     var host = new HostBuilder()
  4.         .ConfigureServices((hostContext, services) =>
  5.                            {
  6.                                services.AddSingleton<IHostedService,ProcessPay>();
  7.                            })
  8.         .Build();
  9.     host.Run();
  10. }
复制代码
支付超时项目

创建一个死信队列服务,用于订阅死信队列中的订单消息,这里我就直接把代码写在Main方法中了
  1. using (var connection = factory.CreateConnection())
  2. {
  3.     using (var channel = connection.CreateModel())
  4.     {
  5.         channel.ExchangeDeclare("dead_letter_exchange", ExchangeType.Direct, true, false, null);
  6.         channel.QueueDeclare("dead_letter_queue", true, false, false, null);
  7.         channel.QueueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");
  8.         
  9.         var consumer = new EventingBasicConsumer(channel);
  10.         consumer.Received += (model, ea) =>
  11.         {
  12.             var body = ea.Body.ToArray();
  13.             var message = Encoding.UTF8.GetString(body);
  14.             // 处理超时未支付的订单逻辑
  15.             var orderDto = JsonConvert.DeserializeObject<OrderDto>(message);
  16.             orderDto.Status = 2;
  17.             Console.WriteLine($"订单信息:{orderDto.Name},{orderDto.Status}");
  18.             Console.WriteLine("超时未支付");
  19.             channel.BasicAck(ea.DeliveryTag, true);
  20.         };
  21.         channel.BasicConsume("dead_letter_queue", false, consumer);
  22.         
  23.         Console.WriteLine(" Press [enter] to exit.");
  24.         Console.ReadLine();
  25.     }
  26. }
复制代码
效果展示

代码看不出效果,直接上图。
首先是3个项目各自运行效果图

然后演示正常消费效果

接下来是超时未支付效果

结尾

这就是一个简单的延时队列和死信队列的代码,模拟了支付超时的场景,这里的数据都写死了的,真实运用的时候肯定是中数据库中获取,修改数据库实体的值。然后死信队列是用于处理在一定时间内未被处理的消息,死信交换机也只是一个普通的交换机,只不过他是用于处理超时的消息的交换机。
对于RabbitMQ的文章基本就结束了,可能还会有一篇RabbitMQ集群搭建的文章,但不是很想去写,最近太懒了~
有问题欢迎指出,活到老学到老~
RabbitMQ系列文章

参考资料


  • 【【2021最新.NET/C#】RabbitMQ从零到高可用集群】

来源:https://www.cnblogs.com/ZYPLJ/archive/2023/07/30/17591838.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具