在Biwen.QuickApi中整合一个极简的发布订阅(事件总线)
|
闲来无聊在我的Biwen.QuickApi中实现一下极简的事件总线,其实代码还是蛮简单的,对于初学者可能有些帮助 就贴出来,有什么不足的地方也欢迎板砖交流~
首先定义一个事件约定的空接口- public interface IEvent{}
复制代码 然后定义事件订阅者接口- public interface IEventSubscriber<T> where T : IEvent
- {
- Task HandleAsync(T @event, CancellationToken ct);
- /// <summary>
- /// 执行排序
- /// </summary>
- int Order { get; }
- /// <summary>
- /// 如果发生错误是否抛出异常,将阻塞后续Handler
- /// </summary>
- bool ThrowIfError { get; }
- }
- public abstract class EventSubscriber<T> : IEventSubscriber<T> where T : IEvent
- {
- public abstract Task HandleAsync(T @event, CancellationToken ct);
- public virtual int Order => 0;
- /// <summary>
- /// 默认不抛出异常
- /// </summary>
- public virtual bool ThrowIfError => false;
- }
复制代码 接着就是发布者- internal class Publisher(IServiceProvider serviceProvider)
- {
- public async Task PublishAsync<T>(T @event, CancellationToken ct) where T : IEvent
- {
- var handlers = serviceProvider.GetServices<IEventSubscriber<T>>();
- if (handlers is null) return;
- foreach (var handler in handlers.OrderBy(x => x.Order))
- {
- try
- {
- await handler.HandleAsync(@event, ct);
- }
- catch
- {
- if (handler.ThrowIfError)
- {
- throw;
- }
- //todo:
- }
- }
- }
- }
复制代码 到此发布订阅的基本代码也就写完了.接下来就是注册发布者和所有的订阅者了
核心代码如下:- static readonly Type InterfaceEventSubscriber = typeof(IEventSubscriber<>);
- static readonly object _lock = new();//锁
- static bool IsToGenericInterface(this Type type, Type baseInterface)
- {
- if (type == null) return false;
- if (baseInterface == null) return false;
- return type.GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == baseInterface);
- }
- static IEnumerable<Type> _eventHanlers = null!;
- static IEnumerable<Type> EventHandlers
- {
- get
- {
- lock (_lock)
- return _eventHanlers ??= ASS.InAllRequiredAssemblies.Where(x =>
- !x.IsAbstract && x.IsPublic && x.IsClass && x.IsToGenericInterface(InterfaceEventSubscriber));
- }
- }
- //注册EventSubscribers
- foreach (var subscriberType in EventSubscribers)
- {
- //存在一个订阅者订阅多个事件的情况:
- var baseTypes = subscriberType.GetInterfaces().Where(x => x.IsGenericType && x.GetGenericTypeDefinition() == InterfaceEventSubscriber).ToArray();
- foreach (var baseType in baseTypes)
- {
- services.AddScoped(baseType, subscriberType);
- }
- }
- //注册Publisher
- services.AddScoped<Publisher>();
-
复制代码 至此发布订阅的代码也就完成了!
现在我们将发布订阅封装到QuickApi中使用:- internal interface IPublisher
- {
- /// <summary>
- /// Event Publish
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="event">Event</param>
- /// <returns></returns>
- Task PublishAsync<T>(T @event, CancellationToken cancellationToken) where T : IEvent;
- }
复制代码 然后BaseQuickApi实现IPublisher接口- internal interface IQuickApi<Req, Rsp> : IHandlerBuilder, IQuickApiMiddlewareHandler, IAntiforgeryApi, IPublisher
- {
- ValueTask<Rsp> ExecuteAsync(Req request);
- }
- // BaseQuickApi.PublishAsync
- public virtual async Task PublishAsync<T>(T @event, CancellationToken cancellationToken = default) where T : IEvent
- {
- using var scope = ServiceRegistration.ServiceProvider.CreateScope();
- var publisher = scope.ServiceProvider.GetRequiredService<Publisher>();
- await publisher.PublishAsync(@event, cancellationToken);
- }
复制代码 至此功能完成,接下来我们测试一下:- using Biwen.QuickApi.Events;
- using Microsoft.AspNetCore.Mvc;
- namespace Biwen.QuickApi.DemoWeb.Apis
- {
- public class MyEvent : BaseRequest<MyEvent>,IEvent
- {
- [FromQuery]
- public string? Message { get; set; }
- }
- public class MyEventHandler : EventSubscriber<MyEvent>
- {
- private readonly ILogger<MyEventHandler> _logger;
- public MyEventHandler(ILogger<MyEventHandler> logger)
- {
- _logger = logger;
- }
- public override Task HandleAsync(MyEvent @event, CancellationToken ct)
- {
- _logger.LogInformation($"msg 2 : {@event.Message}");
- return Task.CompletedTask;
- }
- }
- /// <summary>
- /// 更早执行的Handler
- /// </summary>
- public class MyEventHandler2 : EventSubscriber<MyEvent>
- {
- private readonly ILogger<MyEventHandler2> _logger;
- public MyEventHandler2(ILogger<MyEventHandler2> logger)
- {
- _logger = logger;
- }
- public override Task HandleAsync(MyEvent @event, CancellationToken ct)
- {
- _logger.LogInformation($"msg 1 : {@event.Message}");
- return Task.CompletedTask;
- }
- public override int Order => -1;
- }
- /// <summary>
- /// 抛出异常的Handler
- /// </summary>
- public class MyEventHandler3 : EventSubscriber<MyEvent>
- {
- private readonly ILogger<MyEventHandler3> _logger;
- public MyEventHandler3(ILogger<MyEventHandler3> logger)
- {
- _logger = logger;
- }
- public override Task HandleAsync(MyEvent @event, CancellationToken ct)
- {
- throw new Exception("error");
- }
- public override int Order => -2;
- public override bool ThrowIfError => false;
- }
- [QuickApi("event")]
- public class EventApi : BaseQuickApi<MyEvent>
- {
- public override async ValueTask<IResultResponse> ExecuteAsync(MyEvent request)
- {
- //publish
- await PublishAsync(request);
- return IResultResponse.Content("send event");
- }
- }
- }
复制代码 最后我们运行项目测试一下功能:- curl -X 'GET' \
- 'http://localhost:5101/quick/event?Message=hello%20world' \
- -H 'accept: */*'
复制代码
源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi
来源:https://www.cnblogs.com/vipwan/p/18184088
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
|
|
|
发表于 2024-5-11 05:38:31
举报
回复
分享
|
|
|
|