蒋传领 发表于 2024-5-11 05:38:31

在Biwen.QuickApi中整合一个极简的发布订阅(事件总线)

闲来无聊在我的Biwen.QuickApi中实现一下极简的事件总线,其实代码还是蛮简单的,对于初学者可能有些帮助 就贴出来,有什么不足的地方也欢迎板砖交流~
首先定义一个事件约定的空接口
    public interface IEvent{}然后定义事件订阅者接口
public interface IEventSubscriber<T> where T : IEvent
    {
      Task HandleAsync(T @event, CancellationToken ct);
      /// <summary>
      /// 执行排序
      /// </summary>
      int Order { get; }

      /// <summary>
      /// 如果发生错误是否抛出异常,将阻塞后续Handler
      /// </summary>
      bool ThrowIfError { get; }
    }
    public abstract class EventSubscriber<T> : IEventSubscriber<T> where T : IEvent
    {
      public abstract Task HandleAsync(T @event, CancellationToken ct);
      public virtual int Order => 0;
      /// <summary>
      /// 默认不抛出异常
      /// </summary>
      public virtual bool ThrowIfError => false;
    }接着就是发布者
internal class Publisher(IServiceProvider serviceProvider)
{
        public async Task PublishAsync<T>(T @event, CancellationToken ct) where T : IEvent
        {
                var handlers = serviceProvider.GetServices<IEventSubscriber<T>>();
                if (handlers is null) return;
                foreach (var handler in handlers.OrderBy(x => x.Order))
                {
                        try
                        {
                                await handler.HandleAsync(@event, ct);
                        }
                        catch
                        {
                                if (handler.ThrowIfError)
                                {
                                        throw;
                                }
                                //todo:
                        }
                }
        }
}到此发布订阅的基本代码也就写完了.接下来就是注册发布者和所有的订阅者了
核心代码如下:
      static readonly Type InterfaceEventSubscriber = typeof(IEventSubscriber<>);
      static readonly object _lock = new();//锁
      static bool IsToGenericInterface(this Type type, Type baseInterface)
      {
            if (type == null) return false;
            if (baseInterface == null) return false;
            return type.GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == baseInterface);
      }
      static IEnumerable<Type> _eventHanlers = null!;
      static IEnumerable<Type> EventHandlers
      {
            get
            {
                lock (_lock)
                  return _eventHanlers ??= ASS.InAllRequiredAssemblies.Where(x =>
                  !x.IsAbstract && x.IsPublic && x.IsClass && x.IsToGenericInterface(InterfaceEventSubscriber));
            }
      }
                  //注册EventSubscribers
            foreach (var subscriberType in EventSubscribers)
            {
                //存在一个订阅者订阅多个事件的情况:
                var baseTypes = subscriberType.GetInterfaces().Where(x => x.IsGenericType && x.GetGenericTypeDefinition() == InterfaceEventSubscriber).ToArray();
                foreach (var baseType in baseTypes)
                {
                  services.AddScoped(baseType, subscriberType);
                }
            }
            //注册Publisher
            services.AddScoped<Publisher>();
                至此发布订阅的代码也就完成了!
现在我们将发布订阅封装到QuickApi中使用:
internal interface IPublisher
{
        /// <summary>
        /// Event Publish
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="event">Event</param>
        /// <returns></returns>
        Task PublishAsync<T>(T @event, CancellationToken cancellationToken) where T : IEvent;
}然后BaseQuickApi实现IPublisher接口
internal interface IQuickApi<Req, Rsp> : IHandlerBuilder, IQuickApiMiddlewareHandler, IAntiforgeryApi, IPublisher
{
    ValueTask<Rsp> ExecuteAsync(Req request);
}

// BaseQuickApi.PublishAsync
public virtual async Task PublishAsync<T>(T @event, CancellationToken cancellationToken = default) where T : IEvent
{
    using var scope = ServiceRegistration.ServiceProvider.CreateScope();
    var publisher = scope.ServiceProvider.GetRequiredService<Publisher>();
    await publisher.PublishAsync(@event, cancellationToken);
}至此功能完成,接下来我们测试一下:
using Biwen.QuickApi.Events;
using Microsoft.AspNetCore.Mvc;

namespace Biwen.QuickApi.DemoWeb.Apis
{
    public class MyEvent : BaseRequest<MyEvent>,IEvent
    {
      
      public string? Message { get; set; }
    }

    public class MyEventHandler : EventSubscriber<MyEvent>
    {
      private readonly ILogger<MyEventHandler> _logger;
      public MyEventHandler(ILogger<MyEventHandler> logger)
      {
            _logger = logger;
      }

      public override Task HandleAsync(MyEvent @event, CancellationToken ct)
      {
            _logger.LogInformation($"msg 2 : {@event.Message}");
            return Task.CompletedTask;
      }
    }

    /// <summary>
    /// 更早执行的Handler
    /// </summary>
    public class MyEventHandler2 : EventSubscriber<MyEvent>
    {
      private readonly ILogger<MyEventHandler2> _logger;
      public MyEventHandler2(ILogger<MyEventHandler2> logger)
      {
            _logger = logger;
      }

      public override Task HandleAsync(MyEvent @event, CancellationToken ct)
      {
            _logger.LogInformation($"msg 1 : {@event.Message}");
            return Task.CompletedTask;
      }

      public override int Order => -1;

    }

    /// <summary>
    /// 抛出异常的Handler
    /// </summary>
    public class MyEventHandler3 : EventSubscriber<MyEvent>
    {
      private readonly ILogger<MyEventHandler3> _logger;
      public MyEventHandler3(ILogger<MyEventHandler3> logger)
      {
            _logger = logger;
      }

      public override Task HandleAsync(MyEvent @event, CancellationToken ct)
      {
            throw new Exception("error");
      }

      public override int Order => -2;

      public override bool ThrowIfError => false;

    }

   
    public class EventApi : BaseQuickApi<MyEvent>
    {
      public override async ValueTask<IResultResponse> ExecuteAsync(MyEvent request)
      {
            //publish
            await PublishAsync(request);
            return IResultResponse.Content("send event");
      }
    }
}最后我们运行项目测试一下功能:
curl -X 'GET' \
'http://localhost:5101/quick/event?Message=hello%20world' \
-H 'accept: */*'
源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi

来源:https://www.cnblogs.com/vipwan/p/18184088
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 在Biwen.QuickApi中整合一个极简的发布订阅(事件总线)