翼度科技»论坛 编程开发 .net 查看内容

.Net Core对于`RabbitMQ`封装分布式事件总线

3

主题

3

帖子

9

积分

新手上路

Rank: 1

积分
9
首先我们需要了解到分布式事件总线是什么;

分布式事件总线是一种在分布式系统中提供事件通知、订阅和发布机制的技术。它允许多个组件或微服务之间的协作和通信,而无需直接耦合或了解彼此的实现细节。通过事件总线,组件或微服务可以通过发布或订阅事件来实现异步通信。
例如,当一个组件完成了某项任务并生成了一个事件,它可以通过事件总线发布该事件。其他相关组件可以通过订阅该事件来接收通知,并做出相应的反应。这样,组件之间的耦合就被减轻了,同时也提高了系统的可维护性和可扩展性。
然后了解一下RabbitMQ

RabbitMQ是一种开源的消息代理和队列管理系统,用于在分布式系统中进行异步通信。它的主要功能是接收和分发消息,并且支持多种协议,包括AMQP,STOMP,MQTT等。RabbitMQ通过一个中间层,可以把消息发送者与消息接收者隔离开来,因此消息发送者和消息接收者并不需要在同一时刻在线,并且也不需要互相知道对方的地址。

  • RabbitMQ的主要功能包括:

    • 消息存储:RabbitMQ可以将消息存储在内存或硬盘上,以保证消息的完整性。
    • 消息路由:RabbitMQ支持消息的路由功能,可以将消息从生产者发送到消费者。
    • 消息投递:RabbitMQ提供了多种消息投递策略,包括简单模式、工作队列、发布/订阅模式等。
    • 可靠性:RabbitMQ保证消息的可靠性,即消息不会丢失、不重复、按顺序投递。
    • 可扩展性:RabbitMQ支持水平扩展,可以通过增加节点来扩展系统的处理能力。

本文将讲解使用RabbitMQ实现分布式事件
实现我们创建一个EventsBus.Contract的类库项目,用于提供基本接口,以支持其他实现
在项目中添加以下依赖引用,并且记得添加EventsBus.Contract项目引用
  1. <ItemGroup>
  2.         <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
  3.     <PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" />
  4.     <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" />
  5.     <PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
  6. </ItemGroup>
复制代码
创建项目完成以后分别创建EventsBusOptions.cs,IEventsBusHandle.cs,RabbitMQEventsManage.cs,ILoadEventBus.cs ,提供我们的分布式事件基本接口定义
EventsBusOptions.cs:
  1. namespace EventsBus.Contract;
  2. public class EventsBusOptions
  3. {
  4.     /// <summary>
  5.     /// 接收时异常事件
  6.     /// </summary>
  7.     public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent;
  8. }
复制代码
IEventsBusHandle.cs:
  1. namespace EventsBus.Contract;
  2. public interface IEventsBusHandle<in TEto> where TEto : class
  3. {
  4.     Task HandleAsync(TEto eventData);
  5. }
复制代码
ILoadEventBus.cs:
  1. namespace EventsBus.Contract;
  2. public interface ILoadEventBus
  3. {
  4.     /// <summary>
  5.     /// 发布事件
  6.     /// </summary>
  7.     /// <param name="eto"></param>
  8.     /// <typeparam name="TEto"></typeparam>
  9.     /// <returns></returns>
  10.     Task PushAsync<TEto>(TEto eto) where TEto : class;
  11. }
复制代码
EventsBusAttribute.cs:用于Eto(Eto 是我们按照约定使用的Event Transfer Objects(事件传输对象)的后缀. s虽然这不是必需的,但我们发现识别这样的事件类很有用(就像应用层上的DTO 一样))的名称,对应到RabbitMQ的通道
  1. namespace EventsBus.RabbitMQ;
  2. [AttributeUsage(AttributeTargets.Class)]
  3. public class EventsBusAttribute : Attribute
  4. {
  5.     public readonly string Name;
  6.     public EventsBusAttribute(string name)
  7.     {
  8.         Name = name;
  9.     }
  10. }
复制代码
然后可以创建我们的RabbitMQ实现了,创建EventsBus.RabbitMQ类库项目,用于编写EventsBus.Contract的RabbitMQ实现
创建项目完成以后分别创建Extensions\EventsBusRabbitMQExtensions.cs,Options\RabbitMQOptions.cs,EventsBusAttribute.cs,,RabbitMQFactory.cs,RabbitMQLoadEventBus.cs
Extensions\EventsBusRabbitMQExtensions.cs:提供我们RabbitMQ扩展方法让使用者更轻松的注入,命名空间使用Microsoft.Extensions.DependencyInjection,这样就在注入的时候减少过度使用命名空间了
  1. using EventsBus.Contract;
  2. using EventsBus.RabbitMQ;
  3. using EventsBus.RabbitMQ.Options;
  4. using Microsoft.Extensions.Configuration;
  5. namespace Microsoft.Extensions.DependencyInjection;
  6. public static class EventsBusRabbitMQExtensions
  7. {
  8.     public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services,
  9.         IConfiguration configuration)
  10.     {
  11.         services.AddSingleton<RabbitMQFactory>();
  12.         services.AddSingleton(typeof(RabbitMQEventsManage<>));
  13.         services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));
  14.         services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>();
  15.         
  16.         return services;
  17.     }
  18. }
复制代码
Options\RabbitMQOptions.cs:提供基本的Options 读取配置文件中并且注入,services.Configure(configuration.GetSection(nameof(RabbitMQOptions)));的方法是读取IConfiguration的名称为RabbitMQOptions的配置东西,映射到Options中,具体使用往下看。
  1. using RabbitMQ.Client;
  2. namespace EventsBus.RabbitMQ.Options;
  3. public class RabbitMQOptions
  4. {
  5.     /// <summary>
  6.     /// 要连接的端口。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/>
  7.     /// 指示应使用的协议的缺省值。
  8.     /// </summary>
  9.     public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort;
  10.     /// <summary>
  11.     /// 地址
  12.     /// </summary>
  13.     public string HostName { get; set; }
  14.     /// <summary>
  15.     /// 账号
  16.     /// </summary>
  17.     public string UserName { get; set; }
  18.     /// <summary>
  19.     /// 密码
  20.     /// </summary>
  21.     public string Password { get; set; }
  22. }
复制代码
RabbitMQEventsManage.cs:用于管理RabbitMQ的数据接收,并且将数据传输到指定的事件处理程序
  1. using System.Reflection;
  2. using System.Text.Json;
  3. using EventsBus.Contract;
  4. using Microsoft.Extensions.DependencyInjection;
  5. using RabbitMQ.Client;
  6. using RabbitMQ.Client.Events;
  7. namespace EventsBus.RabbitMQ;
  8. public class RabbitMQEventsManage<TEto> where TEto : class
  9. {
  10.     private readonly IServiceProvider _serviceProvider;
  11.     private readonly RabbitMQFactory _rabbitMqFactory;
  12.     public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
  13.     {
  14.         _serviceProvider = serviceProvider;
  15.         _rabbitMqFactory = rabbitMqFactory;
  16.         _ = Task.Run(Start);
  17.     }
  18.     private void Start()
  19.     {
  20.         var channel = _rabbitMqFactory.CreateRabbitMQ();
  21.         var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
  22.         var name = eventBus?.Name ?? typeof(TEto).Name;
  23.         channel.QueueDeclare(name, false, false, false, null);
  24.         var consumer = new EventingBasicConsumer(channel); //消费者
  25.         channel.BasicConsume(name, true, consumer); //消费消息
  26.         consumer.Received += async (model, ea) =>
  27.         {
  28.             var bytes = ea.Body.ToArray();
  29.             try
  30.             {
  31.                 // 这样就可以实现多个订阅
  32.                 var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>();
  33.                 foreach (var handle in events)
  34.                 {
  35.                     await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes));
  36.                 }
  37.             }
  38.             catch (Exception e)
  39.             {
  40.                 EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes);
  41.             }
  42.         };
  43.     }
  44. }
复制代码
RabbitMQFactory.cs:提供RabbitMQ链接工厂,在这里你可以自己去定义和管理RabbitMQ工厂
  1. using EventsBus.RabbitMQ.Options;
  2. using Microsoft.Extensions.Options;
  3. using RabbitMQ.Client;
  4. namespace EventsBus.RabbitMQ;
  5. public class RabbitMQFactory : IDisposable
  6. {
  7.     private readonly RabbitMQOptions _options;
  8.     private readonly ConnectionFactory _factory;
  9.     private IConnection? _connection;
  10.     public RabbitMQFactory(IOptions<RabbitMQOptions> options)
  11.     {
  12.         _options = options?.Value;
  13.         // 将Options中的参数添加到ConnectionFactory
  14.         _factory = new ConnectionFactory
  15.         {
  16.             HostName = _options.HostName,
  17.             UserName = _options.UserName,
  18.             Password = _options.Password,
  19.             Port = _options.Port
  20.         };
  21.     }
  22.     public IModel CreateRabbitMQ()
  23.     {
  24.         // 当第一次创建RabbitMQ的时候进行链接
  25.         _connection ??= _factory.CreateConnection();
  26.         return _connection.CreateModel();
  27.     }
  28.     public void Dispose()
  29.     {
  30.         _connection?.Dispose();
  31.     }
  32. }
复制代码
RabbitMQLoadEventBus.cs:用于实现ILoadEventBus.cs通过ILoadEventBus发布事件RabbitMQLoadEventBus.cs是RabbitMQ的实现
  1. using System.Reflection;
  2. using System.Text.Json;
  3. using EventsBus.Contract;
  4. using Microsoft.Extensions.DependencyInjection;
  5. namespace EventsBus.RabbitMQ;
  6. public class RabbitMQLoadEventBus : ILoadEventBus
  7. {
  8.     private readonly IServiceProvider _serviceProvider;
  9.     private readonly RabbitMQFactory _rabbitMqFactory;
  10.     public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
  11.     {
  12.         _serviceProvider = serviceProvider;
  13.         _rabbitMqFactory = rabbitMqFactory;
  14.     }
  15.     public async Task PushAsync<TEto>(TEto eto) where TEto : class
  16.     {
  17.         //创建一个通道
  18.         //这里Rabbit的玩法就是一个通道channel下包含多个队列Queue
  19.         using var channel = _rabbitMqFactory.CreateRabbitMQ();
  20.         
  21.         // 获取Eto中的EventsBusAttribute特性,获取名称,如果没有默认使用类名称
  22.         var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
  23.         var name = eventBus?.Name ?? typeof(TEto).Name;
  24.         
  25.         // 使用获取的名称创建一个通道
  26.         channel.QueueDeclare(name, false, false, false, null);
  27.         var properties = channel.CreateBasicProperties();
  28.         properties.DeliveryMode = 1;
  29.         // 将数据序列号,然后发布
  30.         channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生产消息
  31.         // 让其注入启动管理服务,RabbitMQEventsManage需要手动激活,由于RabbitMQEventsManage是单例,只有第一次激活才有效,
  32.         var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>();
  33.         
  34.         await Task.CompletedTask;
  35.     }
  36. }
复制代码
在这里我们的RabbitMQ分布式事件就设计完成了,注:这只是简单的一个示例,并未经过大量测试,请勿直接在生产使用;
然后我们需要使用RabbitMQ分布式事件总线工具包
使用RabbitMQ分布式事件总线的示例

首先我们需要准备一个RabbitMQ,可以在官网自行下载,我就先使用简单的,通过docker compose启动一个RabbitMQ,下面提供一个compose文件
  1. version: '3.1'
  2. services:
  3.   rabbitmq:
  4.     restart: always # 开机自启
  5.     image: rabbitmq:3.11-management # RabbitMQ使用的镜像
  6.     container_name: rabbitmq # docker名称
  7.     hostname: rabbit
  8.     ports:
  9.       - 5672:5672 # 只是RabbitMQ SDK使用的端口
  10.       - 15672:15672 # 这是RabbitMQ管理界面使用的端口
  11.     environment:
  12.       TZ: Asia/Shanghai # 设置RabbitMQ时区
  13.       RABBITMQ_DEFAULT_USER: token # rabbitMQ账号
  14.       RABBITMQ_DEFAULT_PASS: dd666666 # rabbitMQ密码
  15.     volumes:
  16.       - ./data:/var/lib/rabbitmq
复制代码
启动以后我们创建一个WebApi项目,项目名称Demo,创建完成打开项目文件添加引用
  1. <Project Sdk="Microsoft.NET.Sdk.Web">
  2.     <PropertyGroup>
  3.         <TargetFramework>net7.0</TargetFramework>
  4.         <Nullable>enable</Nullable>
  5.         <ImplicitUsings>enable</ImplicitUsings>
  6.     </PropertyGroup>
  7.     <ItemGroup>
  8.         <PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.0" />
  9.         <PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />
  10.     </ItemGroup>
  11.     <ItemGroup>
  12.         
  13.         <ProjectReference Include="..\EventsBus.RabbitMQ\EventsBus.RabbitMQ.csproj" />
  14.     </ItemGroup>
  15. </Project>
复制代码
修改appsettings.json配置文件:将RabbitMQ的配置写上,RabbitMQOptions名称对应在EventsBus.RabbitMQ中的RabbitMQOptions文件![image-20230211022801105]

在这里注入的时候将配置注入好了
  1. {
  2.   "Logging": {
  3.     "LogLevel": {
  4.       "Default": "Information",
  5.       "Microsoft.AspNetCore": "Warning"
  6.     }
  7.   },
  8.   "AllowedHosts": "*",
  9.   "RabbitMQOptions": {
  10.     "HostName": "127.0.0.1",
  11.     "UserName": "token",
  12.     "Password": "dd666666"
  13.   }
  14. }
复制代码
创建DemoEto.cs文件:
  1. using EventsBus.RabbitMQ;
  2. namespace Demo;
  3. [EventsBus("Demo")]
  4. public class DemoEto
  5. {
  6.     public int Size { get; set; }
  7.    
  8.     public string Value { get; set; }
  9. }
复制代码
创建DemoEventsBusHandle.cs文件:这里是订阅DemoEto事件,相当于是DemoEto的处理程序
  1. using System.Text.Json;
  2. using EventsBus.Contract;
  3. namespace Demo;
  4. /// <summary>
  5. /// 事件处理服务,相当于订阅事件
  6. /// </summary>
  7. public class DemoEventsBusHandle : IEventsBusHandle<DemoEto>
  8. {
  9.     public async Task HandleAsync(DemoEto eventData)
  10.     {
  11.         Console.WriteLine($"DemoEventsBusHandle: {JsonSerializer.Serialize(eventData)}");
  12.         await Task.CompletedTask;
  13.     }
  14. }
复制代码
打开Program.cs 修改代码: 在这里注入了事件总线服务,和我们的事件处理服务
  1. using Demo;
  2. using EventsBus.Contract;
  3. var builder = WebApplication.CreateBuilder(args);
  4. builder.Services.AddControllers();
  5. builder.Services.AddEndpointsApiExplorer();
  6. builder.Services.AddSwaggerGen();
  7. // 注入事件处理服务
  8. builder.Services.AddSingleton(typeof(IEventsBusHandle<DemoEto>),typeof(DemoEventsBusHandle));
  9. // 注入RabbitMQ服务
  10. builder.Services.AddEventsBusRabbitMQ(builder.Configuration);
  11. var app = builder.Build();
  12. // 只有在Development显示Swagger
  13. if (app.Environment.IsDevelopment())
  14. {
  15.     app.UseSwagger();
  16.     app.UseSwaggerUI();
  17. }
  18. // 强制Https
  19. app.UseHttpsRedirection();
  20. app.UseAuthorization();
  21. app.MapControllers();
  22. app.Run();
复制代码
创建Controllers\EventBusController.cs控制器:我们在控制器中注入了ILoadEventBus ,通过调用接口实现发布事件;
  1. using EventsBus.Contract;
  2. using Microsoft.AspNetCore.Mvc;
  3. namespace Demo.Controllers;
  4. [ApiController]
  5. [Route("[controller]")]
  6. public class EventBusController : ControllerBase
  7. {
  8.     private readonly ILoadEventBus _loadEventBus;
  9.     public EventBusController(ILoadEventBus loadEventBus)
  10.     {
  11.         _loadEventBus = loadEventBus;
  12.     }
  13.     /// <summary>
  14.     /// 发送信息
  15.     /// </summary>
  16.     /// <param name="eto"></param>
  17.     [HttpPost]
  18.     public async Task Send(DemoEto eto)
  19.     {
  20.         await _loadEventBus.PushAsync(eto);
  21.     }
  22. }
复制代码
然后我们启动程序会打开Swagger调试界面:

然后我们发送一下事件:

我们可以看到,在数据发送的时候也同时订阅到了我们的信息,也可以通过分布式事件总线限流等实现,
来自Token的分享
技术交流群:737776595

来源:https://www.cnblogs.com/hejiale010426/archive/2023/02/11/17110806.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具