翼度科技»论坛 编程开发 .net 查看内容

RabbitMQ在.net core中的应用

4

主题

4

帖子

12

积分

新手上路

Rank: 1

积分
12
RabbitMQ 是一个可靠且成熟的消息传递和流代理,它很容易部署在云环境、内部部署和本地机器上。它目前被全世界数百万人使用。

1.基本概念

生产者(Producer)
  1. 生产者是一个发送消息的程序。发送消息的程序可以是任何语言编写的,只要它能够连接到RabbitMQ服务器,并且能够发送消息到RabbitMQ服务器。
复制代码
消费者(Consumer)
  1. 消费者是一个接收消息的程序。接收消息的程序可以是任何语言编写的,只要它能够连接到RabbitMQ服务器,并且能够从RabbitMQ服务器接收消息。
复制代码
队列(Queue)
  1. 队列是RabbitMQ的内部对象,用于存储消息。多个生产者可以向一个队列发送消息,多个消费者可以尝试从一个队列接收消息。队列支持多种消息分发策略。
复制代码
交换机(Exchange)
  1. 交换机是消息的分发中心。它接收来自生产者的消息,然后将这些消息分发给队列。交换机有多种类型,包括直连交换机、主题交换机、扇形交换机、头交换机。
复制代码
绑定(Binding)
  1. 绑定是交换机和队列之间的关联关系。绑定可以使用路由键进行绑定,也可以使用通配符进行绑定。
复制代码
路由键(Routing Key)
  1. 路由键是生产者发送消息时附带的一个属性。路由键的作用是决定消息被分发到哪个队列。
复制代码
通配符(Wildcard)
  1. 通配符是一种模式匹配的方式。RabbitMQ支持两种通配符:`*`和`#`。
复制代码
绑定键(Binding Key)
  1. 绑定键是交换机和队列之间的关联关系。绑定键可以使用路由键进行绑定,也可以使用通配符进行绑定。
复制代码
持久化(Durable)
  1. 持久化是指RabbitMQ服务器重启后,消息是否还存在。持久化可以应用到交换机、队列、绑定、消息等。
复制代码
确认机制(Acknowledge)
  1. 确认机制是指消费者接收到消息后,向RabbitMQ服务器发送一个确认消息。RabbitMQ服务器收到确认消息后,会删除这条消息。
  2. 自动确认
  3.         消费者接收到消息后,RabbitMQ服务器会自动删除这条消息。
  4. 手动确认
  5.         消费者接收到消息后,需要向RabbitMQ服务器发送一个确认消息。RabbitMQ服务器收到确认消息后,会删除这条消息。
复制代码
拒绝机制(Reject)
  1. 拒绝机制是指消费者接收到消息后,向RabbitMQ服务器发送一个拒绝消息。RabbitMQ服务器收到拒绝消息后,会将这条消息重新发送给其他消费者。
复制代码
死信队列(Dead Letter Queue)
  1. 死信队列是指消息被拒绝、过期或者达到最大重试次数后,会被发送到死信队列。
复制代码
消息过期(Message TTL)
  1. 消息过期是指消息在指定时间内没有被消费者消费,会被删除。
复制代码
消息优先级(Message Priority)
  1. 消息优先级是指消息在队列中的优先级。消息优先级高的消息会被优先消费。
复制代码
消息分发
  1. 消息分发是指消息在队列中的分发策略。消息分发策略包括轮询分发、公平分发、负载均衡分发。
复制代码
2.环境搭建

Docker 安装 RabbitMQ
  1. docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=always --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e TZ=Asia/Shanghai rabbitmq:management
复制代码

  • -d:后台运行
  • --restart:重启策略
  • --name:容器名称
  • -p:端口映射
  • --hostname:主机名
  • -e:环境变量

    • RABBITMQ_DEFAULT_USER:默认用户名
    • RABBITMQ_DEFAULT_PASS:默认密码

  • TZ:时区
  • rabbitmq:management:镜像名称
Docker Compose 安装 RabbitMQ
  1. version: "3.1"
  2. services:
  3. rabbitmq:
  4.     restart: always
  5.     image: rabbitmq:management
  6.     container_name: rabbitmq
  7.     hostname: my-rabbit
  8.     ports:
  9.         - 5672:5672
  10.         - 15672:15672 # RabbitMQ管理界面端口
  11.     environment:
  12.         TZ: Asia/Shanghai
  13.             RABBITMQ_DEFAULT_USER: admin
  14.             RABBITMQ_DEFAULT_PASS: admin
复制代码

  • restart:重启策略
  • image:镜像名称
  • container_name:容器名称
  • hostname:主机名
  • ports:端口映射
  • environment:环境变量

    • TZ:时区
    • RABBITMQ_DEFAULT_USER:默认用户名
    • RABBITMQ_DEFAULT_PASS:默认密码

  • rabbitmq:management:镜像名称
3.使用

客户端SDK代码在GitHub:https://github.com/Tangtang1997/IKunLibrary
新建 TestRequest 类,实现 IRabbitMqRequest 接口,定义消息体
  1. public class TestRequest : IRabbitMqRequest
  2. {
  3. /// <summary>
  4. /// 重试次数
  5. /// </summary>
  6. public int RetryCount { get; set; }
  7. #region 自定义字段
  8. /// <summary>
  9. /// id
  10. /// </summary>
  11. public string Id { get; set; } = default!;
  12. /// <summary>
  13. /// 名称
  14. /// </summary>
  15. public string Name { get; set; } = default!;
  16. /// <summary>
  17. /// 年龄
  18. /// </summary>
  19. public int Age { get; set; }
  20. #endregion
  21. }
复制代码
新建TestRequestHandler类,实现IRabbitMqRequestHandler接口,处理消息
  1. public class TestRequestHanlder : IRequestProcessorHandler<TestRequest>
  2. {
  3. private readonly ILogger<TestRequestHanlder> _logger;
  4. public TestRequestHanlder(ILogger<TestRequestHanlder> logger)
  5. {
  6.     _logger = logger;
  7. }
  8. public Task StartAsync(CancellationToken cancellationToken)
  9. {
  10.     return Task.CompletedTask;
  11. }
  12. public Task StopAsync(int milliseconds, CancellationToken cancellationToken = default)
  13. {
  14.     return Task.CompletedTask;
  15. }
  16. public async Task HandleAsync(TestRequest request, CancellationToken cancellationToken = default)
  17. {
  18.     _logger.LogInformation($"开始处理消息: {request.Id}");
  19.     //模拟处理消息耗时操作
  20.     await Task.Delay(1000, cancellationToken);
  21.     _logger.LogInformation($"消息处理完成: {request.Id}");
  22.     }
  23. }
复制代码
使用 IHostedService 来托管服务
  1. public class SampleHostedService : IHostedService
  2. {
  3. private readonly IConsumerProcessorManager<TestRequest> _consumerProcessorManager;
  4. private readonly IHostApplicationLifetime _applicationLifetime;
  5. private readonly ILogger<SampleHostedService> _logger;
  6. public SampleHostedService(
  7.     IConsumerProcessorManager<TestRequest> consumerProcessorManager,
  8.     IHostApplicationLifetime applicationLifetime,
  9.     ILogger<SampleHostedService> logger)
  10. {
  11.     _consumerProcessorManager = consumerProcessorManager;
  12.     _applicationLifetime = applicationLifetime;
  13.     _logger = logger;
  14. }
  15. public async Task StartAsync(CancellationToken cancellationToken)
  16. {
  17.     _applicationLifetime.ApplicationStarted.Register(() =>
  18.     {
  19.         _logger.LogInformation("SampleHostedService is starting.");
  20.         _consumerProcessorManager.StartAsync(cancellationToken);
  21.     });
  22.     _applicationLifetime.ApplicationStopping.Register(() =>
  23.     {
  24.         _logger.LogInformation("SampleHostedService is stopping.");
  25.         _consumerProcessorManager.StopAsync(3000, cancellationToken);
  26.     });
  27.     await Task.CompletedTask;
  28. }
  29. public async Task StopAsync(CancellationToken cancellationToken)
  30. {
  31.     await Task.CompletedTask;
  32.     }
  33. }
复制代码
注册并启用服务
  1. IHost host = Host.CreateDefaultBuilder(args)
  2.     .ConfigureServices(services =>
  3. {
  4.     services.AddHostedService<SampleHostedService>();
  5.     var configuration = services.BuildServiceProvider().GetRequiredService<IConfiguration>();
  6.     var hostName = configuration["RabbitMq:Host"] ?? throw new Exception("HostName is not configured");
  7.     var port = int.Parse(configuration["RabbitMq:Port"] ?? throw new Exception("Port is not configured"));
  8.     var userName = configuration["RabbitMq:Username"] ?? throw new Exception("Username is not configured");
  9.     var password = configuration["RabbitMq:Password"] ?? throw new Exception("Password is not configured");
  10.     var queueName = configuration["RabbitMq:QueueName"] ?? throw new Exception("QueueName is not configured");
  11.     services.AddRabbitMq<TestRequest, TestRequestHanlder>(options =>
  12.     {
  13.         options.UseSsl = false;
  14.         options.HostName = hostName;
  15.         options.Port = port;
  16.         options.UserName = userName;
  17.         options.Password = password;
  18.         options.Durable = true;
  19.         options.NetworkRecoveryInterval = 10000;
  20.         options.ExchangeType = ExchangeType.Direct;
  21.         options.QueueName = queueName;
  22.         options.Exchange = $"{queueName}_SERVICE_EXCHANGE";
  23.         options.RoutingKey = $"{queueName}_ROUTING_KEY";
  24.         options.DeadLetterExchange = $"{queueName}_SERVICE_EXCHANGE_DEAD";
  25.         options.DeadLetterQueueName = $"{queueName}_DEAD";
  26.         options.DeadLetterRoutingKey = $"{queueName}_ROUTING_KEY";
  27.     });
  28. })
  29. .Build();
  30. await host.RunAsync();
复制代码
4.参考资料

https://www.cnblogs.com/Tangtang1997/p/18067763

来源:https://www.cnblogs.com/chenshibao/p/18429050
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具