翼度科技»论坛 编程开发 .net 查看内容

在Biwen.QuickApi中整合一个极简的发布订阅(事件总线)

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
闲来无聊在我的Biwen.QuickApi中实现一下极简的事件总线,其实代码还是蛮简单的,对于初学者可能有些帮助 就贴出来,有什么不足的地方也欢迎板砖交流~
首先定义一个事件约定的空接口
  1.     public interface IEvent{}
复制代码
然后定义事件订阅者接口
  1. public interface IEventSubscriber<T> where T : IEvent
  2.     {
  3.         Task HandleAsync(T @event, CancellationToken ct);
  4.         /// <summary>
  5.         /// 执行排序
  6.         /// </summary>
  7.         int Order { get; }
  8.         /// <summary>
  9.         /// 如果发生错误是否抛出异常,将阻塞后续Handler
  10.         /// </summary>
  11.         bool ThrowIfError { get; }
  12.     }
  13.     public abstract class EventSubscriber<T> : IEventSubscriber<T> where T : IEvent
  14.     {
  15.         public abstract Task HandleAsync(T @event, CancellationToken ct);
  16.         public virtual int Order => 0;
  17.         /// <summary>
  18.         /// 默认不抛出异常
  19.         /// </summary>
  20.         public virtual bool ThrowIfError => false;
  21.     }
复制代码
接着就是发布者
  1. internal class Publisher(IServiceProvider serviceProvider)
  2. {
  3.         public async Task PublishAsync<T>(T @event, CancellationToken ct) where T : IEvent
  4.         {
  5.                 var handlers = serviceProvider.GetServices<IEventSubscriber<T>>();
  6.                 if (handlers is null) return;
  7.                 foreach (var handler in handlers.OrderBy(x => x.Order))
  8.                 {
  9.                         try
  10.                         {
  11.                                 await handler.HandleAsync(@event, ct);
  12.                         }
  13.                         catch
  14.                         {
  15.                                 if (handler.ThrowIfError)
  16.                                 {
  17.                                         throw;
  18.                                 }
  19.                                 //todo:
  20.                         }
  21.                 }
  22.         }
  23. }
复制代码
到此发布订阅的基本代码也就写完了.接下来就是注册发布者和所有的订阅者了
核心代码如下:
  1.         static readonly Type InterfaceEventSubscriber = typeof(IEventSubscriber<>);
  2.         static readonly object _lock = new();//锁
  3.         static bool IsToGenericInterface(this Type type, Type baseInterface)
  4.         {
  5.             if (type == null) return false;
  6.             if (baseInterface == null) return false;
  7.             return type.GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == baseInterface);
  8.         }
  9.         static IEnumerable<Type> _eventHanlers = null!;
  10.         static IEnumerable<Type> EventHandlers
  11.         {
  12.             get
  13.             {
  14.                 lock (_lock)
  15.                     return _eventHanlers ??= ASS.InAllRequiredAssemblies.Where(x =>
  16.                     !x.IsAbstract && x.IsPublic && x.IsClass && x.IsToGenericInterface(InterfaceEventSubscriber));
  17.             }
  18.         }
  19.                     //注册EventSubscribers
  20.             foreach (var subscriberType in EventSubscribers)
  21.             {
  22.                 //存在一个订阅者订阅多个事件的情况:
  23.                 var baseTypes = subscriberType.GetInterfaces().Where(x => x.IsGenericType && x.GetGenericTypeDefinition() == InterfaceEventSubscriber).ToArray();
  24.                 foreach (var baseType in baseTypes)
  25.                 {
  26.                     services.AddScoped(baseType, subscriberType);
  27.                 }
  28.             }
  29.             //注册Publisher
  30.             services.AddScoped<Publisher>();
  31.                
复制代码
至此发布订阅的代码也就完成了!
现在我们将发布订阅封装到QuickApi中使用:
  1. internal interface IPublisher
  2. {
  3.         /// <summary>
  4.         /// Event Publish
  5.         /// </summary>
  6.         /// <typeparam name="T"></typeparam>
  7.         /// <param name="event">Event</param>
  8.         /// <returns></returns>
  9.         Task PublishAsync<T>(T @event, CancellationToken cancellationToken) where T : IEvent;
  10. }
复制代码
然后BaseQuickApi实现IPublisher接口
  1. internal interface IQuickApi<Req, Rsp> : IHandlerBuilder, IQuickApiMiddlewareHandler, IAntiforgeryApi, IPublisher
  2. {
  3.     ValueTask<Rsp> ExecuteAsync(Req request);
  4. }
  5. // BaseQuickApi.PublishAsync
  6. public virtual async Task PublishAsync<T>(T @event, CancellationToken cancellationToken = default) where T : IEvent
  7. {
  8.     using var scope = ServiceRegistration.ServiceProvider.CreateScope();
  9.     var publisher = scope.ServiceProvider.GetRequiredService<Publisher>();
  10.     await publisher.PublishAsync(@event, cancellationToken);
  11. }
复制代码
至此功能完成,接下来我们测试一下:
  1. using Biwen.QuickApi.Events;
  2. using Microsoft.AspNetCore.Mvc;
  3. namespace Biwen.QuickApi.DemoWeb.Apis
  4. {
  5.     public class MyEvent : BaseRequest<MyEvent>,IEvent
  6.     {
  7.         [FromQuery]
  8.         public string? Message { get; set; }
  9.     }
  10.     public class MyEventHandler : EventSubscriber<MyEvent>
  11.     {
  12.         private readonly ILogger<MyEventHandler> _logger;
  13.         public MyEventHandler(ILogger<MyEventHandler> logger)
  14.         {
  15.             _logger = logger;
  16.         }
  17.         public override Task HandleAsync(MyEvent @event, CancellationToken ct)
  18.         {
  19.             _logger.LogInformation($"msg 2 : {@event.Message}");
  20.             return Task.CompletedTask;
  21.         }
  22.     }
  23.     /// <summary>
  24.     /// 更早执行的Handler
  25.     /// </summary>
  26.     public class MyEventHandler2 : EventSubscriber<MyEvent>
  27.     {
  28.         private readonly ILogger<MyEventHandler2> _logger;
  29.         public MyEventHandler2(ILogger<MyEventHandler2> logger)
  30.         {
  31.             _logger = logger;
  32.         }
  33.         public override Task HandleAsync(MyEvent @event, CancellationToken ct)
  34.         {
  35.             _logger.LogInformation($"msg 1 : {@event.Message}");
  36.             return Task.CompletedTask;
  37.         }
  38.         public override int Order => -1;
  39.     }
  40.     /// <summary>
  41.     /// 抛出异常的Handler
  42.     /// </summary>
  43.     public class MyEventHandler3 : EventSubscriber<MyEvent>
  44.     {
  45.         private readonly ILogger<MyEventHandler3> _logger;
  46.         public MyEventHandler3(ILogger<MyEventHandler3> logger)
  47.         {
  48.             _logger = logger;
  49.         }
  50.         public override Task HandleAsync(MyEvent @event, CancellationToken ct)
  51.         {
  52.             throw new Exception("error");
  53.         }
  54.         public override int Order => -2;
  55.         public override bool ThrowIfError => false;
  56.     }
  57.     [QuickApi("event")]
  58.     public class EventApi : BaseQuickApi<MyEvent>
  59.     {
  60.         public override async ValueTask<IResultResponse> ExecuteAsync(MyEvent request)
  61.         {
  62.             //publish
  63.             await PublishAsync(request);
  64.             return IResultResponse.Content("send event");
  65.         }
  66.     }
  67. }
复制代码
最后我们运行项目测试一下功能:
  1. curl -X 'GET' \
  2.   'http://localhost:5101/quick/event?Message=hello%20world' \
  3.   -H 'accept: */*'
复制代码

源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi

来源:https://www.cnblogs.com/vipwan/p/18184088
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具