|
RabbitMQ延时队列和死信队列
延时队列和死信队列
延时队列是RabbitMQ中的一种特殊队列,它可以在消息到达队列后延迟一段时间再被消费。
延时队列的实现原理是通过使用消息的过期时间和死信队列来实现。当消息被发送到延时队列时,可以为消息设置一个过期时间,这个过期时间决定了消息在延时队列中等待的时间。如果消息在过期时间内没有被消费者消费,则会被自动发送到一个预先指定的死信队列中。
在RabbitMQ中,延时队列的实现可以通过以下步骤来完成:
- 创建一个普通的队列作为延时队列,设置x-message-ttl参数为消息的过期时间。
- 创建一个死信队列,用于接收延时队列中过期的消息。
- 将延时队列设置为普通队列的死信交换机,并指定死信路由键。
- 将消费者绑定到死信队列,以消费延时队列中过期的消息。
使用场景
- 订单在十分钟内未支付则自动取消。
- 新创建的店铺,如果十天内都没有上传过商品,则自动发送消息提醒。
- 账单在一周内未支付,则自动结算。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 还有很多场景就不一一例举了。
TTL设置
方式一:
创建队列时设置x-message-ttl的属性,所有被投递到该队列的消息最多都不会超过60s- var args = new Dictionary<string,object>();
- args.Add("x-message-ttl",60000); //单位为毫秒
- model.QueueDeclare("myqueue",false,false,false,args);
复制代码 方式二:
为每条消息设置TTL,为每条消息设置过期时间。- IBasicProperties props = model.CreateBasicProperties();
- props.ContentType = "text/plain";
- props.DeliveryMode = 2;
- props.Expiration = "60000"
- model.BasicPublic(exchangeName,routingKey,props,messageBodyBytes);
复制代码 代码实践
模拟支付业务
整个项目由三部分组成
- Web API项目:用于发送订单请求,生产者。
- 控制台项目一:用于处理订单支付,延时队列。
- 控制台项目二:用于处理超时未支付的订单,死信队列。
Web API项目
订单类,就简单的写一个用于演示,真实业务肯定不是这样~- public class OrderDto
- {
- /// <summary>
- /// 订单名称
- /// </summary>
- public string Name { get; set; }
- /// <summary>
- /// 订单状态
- /// 0 未支付
- /// 1 已支付
- /// 2 超时
- /// </summary>
- public int Status { get; set; }
- }
复制代码 控制器
- [ApiController]
- [Route("api/orders")]
- public class OrdersController : ControllerBase
- {
- private readonly IOrderService _orderService;
- public OrdersController(IOrderService orderService)
- {
- _orderService = orderService;
- }
- [HttpPost]
- public IActionResult CreateOrder([FromBody] OrderDto orderDto)
- {
- // 处理订单逻辑
- _orderService.ProcessOrder(orderDto);
- return Ok();
- }
- }
复制代码 订单服务
- public interface IOrderService
- {
- void ProcessOrder(OrderDto orderDto);
- }
- public class OrderService : IOrderService
- {
- private readonly RabbitMQConnectionFactory _connectionFactory;
-
- public OrderService(RabbitMQConnectionFactory connectionFactory)
- {
- _connectionFactory = connectionFactory;
- }
- public void ProcessOrder(OrderDto orderDto)
- {
- using (var channel = _connectionFactory.CreateChannel())
- {
- var properties = channel.CreateBasicProperties();
- properties.Headers = new Dictionary<string, object>
- {
- { "x-delay", 1000 * 20 } // 设置20秒延时
- };
- var message = JsonConvert.SerializeObject(orderDto);
- var body = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish("delayed_exchange", "routing_key", properties, body);
- }
- }
- }
复制代码 支付处理项目
ProcessPay类,用于接收订单消息- public class ProcessPay : IHostedService
- {
- private readonly ConnectionFactory _factory;
- private IConnection _connection;
- private IModel _channel;
- public ProcessPay()
- {
- _factory = new ConnectionFactory()
- {
- HostName = "ip",
- Port = 5672,
- UserName = "用户名",
- Password = "密码"
- };
- }
- public Task StartAsync(CancellationToken cancellationToken)
- {
- Console.WriteLine(" Press [enter] to exit.");
- _connection = _factory.CreateConnection();
- _channel = _connection.CreateModel();
- _channel.ExchangeDeclare("delayed_exchange", ExchangeType.Direct, true, false, null);
- //关键代码,绑定死信交换机
- var arguments = new Dictionary<string, object>
- {
- { "x-dead-letter-exchange", "dead_letter_exchange" },
- { "x-dead-letter-routing-key", "dead_letter_routing_key" }
- };
- _channel.QueueDeclare("delayed_queue", true, false, false, arguments);
- _channel.QueueBind("delayed_queue", "delayed_exchange", "routing_key");
- var consumer = new EventingBasicConsumer(_channel);
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body.ToArray();
- var message = Encoding.UTF8.GetString(body);
- // 处理支付逻辑
- var orderDto = JsonConvert.DeserializeObject<OrderDto>(message);
- Console.WriteLine($"订单信息:{orderDto.Name}");
- Console.WriteLine("请输入价格(模拟支付):");
- // 超时未支付
- string? many = "";
- // 支付处理
- Console.WriteLine("请输入:");
- // 超时未支付进行处理
- Task.Factory.StartNew(() =>
- {
- many = Console.ReadLine();
- Console.WriteLine($"many:{many}");
- }).Wait(20 * 1000);
- if (string.Equals(many, "100"))
- {
- orderDto.Status = 1;
- Console.WriteLine("支付完成");
- _channel.BasicAck(ea.DeliveryTag, true);
- }
- else
- {
- //重试几次依然失败
- Console.WriteLine("等待一定时间内失效超时未支付的订单");
- _channel.BasicNack(ea.DeliveryTag, false, false);
- }
- };
- _channel.BasicConsume("delayed_queue", false, consumer);
- return Task.CompletedTask;
- }
- public Task StopAsync(CancellationToken cancellationToken)
- {
- _channel?.Close();
- _connection?.Close();
- _channel?.Dispose();
- _connection?.Dispose();
- return Task.CompletedTask;
- }
- }
复制代码 在Main方法中使用单例模式注册该服务,当然直接将代码写在Main方法也是没有问题的,只不过这种方式方便管理。- static void Main(string[] args)
- {
- var host = new HostBuilder()
- .ConfigureServices((hostContext, services) =>
- {
- services.AddSingleton<IHostedService,ProcessPay>();
- })
- .Build();
- host.Run();
- }
复制代码 支付超时项目
创建一个死信队列服务,用于订阅死信队列中的订单消息,这里我就直接把代码写在Main方法中了- using (var connection = factory.CreateConnection())
- {
- using (var channel = connection.CreateModel())
- {
- channel.ExchangeDeclare("dead_letter_exchange", ExchangeType.Direct, true, false, null);
- channel.QueueDeclare("dead_letter_queue", true, false, false, null);
- channel.QueueBind("dead_letter_queue", "dead_letter_exchange", "dead_letter_routing_key");
-
- var consumer = new EventingBasicConsumer(channel);
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body.ToArray();
- var message = Encoding.UTF8.GetString(body);
- // 处理超时未支付的订单逻辑
- var orderDto = JsonConvert.DeserializeObject<OrderDto>(message);
- orderDto.Status = 2;
- Console.WriteLine($"订单信息:{orderDto.Name},{orderDto.Status}");
- Console.WriteLine("超时未支付");
- channel.BasicAck(ea.DeliveryTag, true);
- };
- channel.BasicConsume("dead_letter_queue", false, consumer);
-
- Console.WriteLine(" Press [enter] to exit.");
- Console.ReadLine();
- }
- }
复制代码 效果展示
代码看不出效果,直接上图。
首先是3个项目各自运行效果图
然后演示正常消费效果
接下来是超时未支付效果
结尾
这就是一个简单的延时队列和死信队列的代码,模拟了支付超时的场景,这里的数据都写死了的,真实运用的时候肯定是中数据库中获取,修改数据库实体的值。然后死信队列是用于处理在一定时间内未被处理的消息,死信交换机也只是一个普通的交换机,只不过他是用于处理超时的消息的交换机。
对于RabbitMQ的文章基本就结束了,可能还会有一篇RabbitMQ集群搭建的文章,但不是很想去写,最近太懒了~
有问题欢迎指出,活到老学到老~
RabbitMQ系列文章
参考资料
- 【【2021最新.NET/C#】RabbitMQ从零到高可用集群】
来源:https://www.cnblogs.com/ZYPLJ/archive/2023/07/30/17591838.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
|