翼度科技»论坛 编程开发 .net 查看内容

实现一个事件总线

11

主题

11

帖子

33

积分

新手上路

Rank: 1

积分
33
使用 C# 实现一个 Event Bus

Event Bus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。它允许不同的组件通过发布和订阅事件来进行解耦和通信。
在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。
首先,我们有两个基本的约束接口:IEvent和IAsyncEventHandler。IEvent是一个空接口,用于约束事件的类型。IAsyncEventHandler是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。
接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。其中,Publish和PublishAsync方法用于发布事件,而OnSubscribe方法用于订阅事件。
然后,我们看到一个实现了本地事件总线的类LocalEventBusManager。它实现了ILocalEventBusManager接口,用于在单一管道内处理本地事件。它使用了一个Channel来存储事件,并提供了发布事件的方法Publish和PublishAsync。此外,它还提供了一个自动处理事件的方法AutoHandle。
总的来说,Event Bus提供了一种方便的方式来实现组件之间的松耦合通信。通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。这种机制可以提高代码的可维护性和可扩展性。
Github仓库地址:soda-event-bus
实现一些基本约束

先实现一些约束,实现IEvent约束事件,实现IAsyncEvnetHandler where TEvent:IEvent来约束事件的处理程序。
  1. public interface IEvent
  2. {
  3. }
  4. public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
  5. {
  6.     Task HandleAsync(IEvent @event);
  7.     void HandleException(IEvent @event, Exception ex);
  8. }
复制代码
接下来规定一下咱们的IEventBus,会有哪些操作方法。基本就是发布和订阅。
  1. public interface IEventBus
  2. {
  3.     void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
  4.     Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;
  5.     void OnSubscribe<TEvent>() where TEvent : IEvent;
  6. }
复制代码
实现一个本地事件总线

本地事件处理

本地事件的处理我打算采用两种方式实现,一种是LocalEventBusManager即本地事件管理,第二种是LocalEventBusPool池化本地事件。
LocalEvnetBusManager

LocalEventBusManager主要在单一管道内进行处理,集中进行消费。
  1. public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
  2. {
  3.     void Publish(TEvent @event);
  4.     Task PublishAsync(TEvent @event) ;
  5.    
  6.     void AutoHandle();
  7. }
  8. public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>
  9.     where TEvent: IEvent
  10. {
  11.     readonly IServiceProvider _servicesProvider = serviceProvider;
  12.     private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();
  13.     public void Publish(TEvent @event)
  14.     {
  15.         Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");
  16.         _eventChannel.Writer.WriteAsync(@event);
  17.     }
  18.     private CancellationTokenSource Cts { get; } = new();
  19.     public void Cancel()
  20.     {
  21.         Cts.Cancel();
  22.     }
  23.    
  24.     public async Task PublishAsync(TEvent @event)
  25.     {
  26.         await _eventChannel.Writer.WriteAsync(@event);
  27.     }
  28.     public void AutoHandle()
  29.     {
  30.         // 确保只启动一次
  31.         if (!Cts.IsCancellationRequested) return;
  32.         Task.Run(async () =>
  33.         {
  34.             while (!Cts.IsCancellationRequested)
  35.             {
  36.                 var reader = await _eventChannel.Reader.ReadAsync();
  37.                 await HandleAsync(reader);
  38.             }
  39.         }, Cts.Token);
  40.     }
  41.     async Task HandleAsync(TEvent @event)
  42.     {
  43.         var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();
  44.         if (handler is null)
  45.         {
  46.             throw new NullReferenceException($"No handler for event {@event.GetType().Name}");
  47.         }
  48.         try
  49.         {
  50.             await handler.HandleAsync(@event);
  51.         }
  52.         catch (Exception ex)
  53.         {
  54.             handler.HandleException( @event, ex);
  55.         }
  56.     }
  57. }
复制代码
LocalEventBusPool

LocalEventBusPool即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。
  1. public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
  2. {
  3.     private readonly IServiceProvider _serviceProvider = serviceProvider;
  4.     private class ChannelKey
  5.     {
  6.         public required string Key { get; init; }
  7.         public int Subscribers { get; set; }
  8.         public override bool Equals(object? obj)
  9.         {
  10.             if (obj is ChannelKey key)
  11.             {
  12.                 return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);
  13.             }
  14.             return false;
  15.         }
  16.         public override int GetHashCode()
  17.         {
  18.             return 0;
  19.         }
  20.     }
  21.     private Channel<IEvent> Rent(string channel)
  22.     {
  23.         _channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);
  24.         if (value != null) return value;
  25.         value = Channel.CreateUnbounded<IEvent>();
  26.         _channels.TryAdd(new ChannelKey() { Key = channel }, value);
  27.         return value;
  28.     }
  29.     private Channel<IEvent> Rent(ChannelKey channelKey)
  30.     {
  31.         _channels.TryGetValue(channelKey, out var value);
  32.         if (value != null) return value;
  33.         value = Channel.CreateUnbounded<IEvent>();
  34.         _channels.TryAdd(channelKey, value);
  35.         return value;
  36.     }
  37.     private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();
  38.     private CancellationTokenSource Cts { get; } = new();
  39.     public void Cancel()
  40.     {
  41.         Cts.Cancel();
  42.         _channels.Clear();
  43.         Cts.TryReset();
  44.     }
  45.     public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
  46.     {
  47.         await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);
  48.     }
  49.     public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
  50.     {
  51.         Rent(typeof(TEvent).Name).Writer.TryWrite(@event);
  52.     }
  53.     public void OnSubscribe<TEvent>() where TEvent : IEvent
  54.     {
  55.         var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??
  56.                          new ChannelKey() { Key = typeof(TEvent).Name };
  57.         channelKey.Subscribers++;
  58.         Task.Run(async () =>
  59.         {
  60.             try
  61.             {
  62.                 while (!Cts.IsCancellationRequested)
  63.                 {
  64.                     var @event = await ReadAsync(channelKey);
  65.                     var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();
  66.                     if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");
  67.                     try
  68.                     {
  69.                         await handler.HandleAsync((TEvent)@event);
  70.                     }
  71.                     catch (Exception ex)
  72.                     {
  73.                         handler.HandleException((TEvent)@event, ex);
  74.                     }
  75.                 }
  76.             }
  77.             catch (Exception e)
  78.             {
  79.                 throw new InvalidOperationException("Error on onSubscribe handler", e);
  80.             }
  81.         }, Cts.Token);
  82.     }
  83.     private async Task<IEvent> ReadAsync(string channel)
  84.     {
  85.         return await Rent(channel).Reader.ReadAsync(Cts.Token);
  86.     }
  87.     private async Task<IEvent> ReadAsync(ChannelKey channel)
  88.     {
  89.         return await Rent(channel).Reader.ReadAsync(Cts.Token);
  90.     }
  91. }
复制代码
LocalEventBus

实现LocalEventBus继承自IEventBus即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。
  1. public interface ILocalEventBus: IEventBus
  2. {
  3. }
  4. public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
  5. {
  6.     private  LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();
  7.    
  8.    
  9.     public void Publish<TEvent>(TEvent @event) where TEvent : IEvent
  10.     {
  11.         if (options.Pool)
  12.         {
  13.             Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
  14.             EventBusPool.Publish(@event);
  15.         }
  16.         else
  17.         {
  18.             var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
  19.             if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
  20.             manager.Publish(@event);
  21.         }
  22.     }
  23.     public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
  24.     {
  25.         if (options.Pool)
  26.         {
  27.             Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
  28.             await EventBusPool.PublishAsync(@event);
  29.         }
  30.         else
  31.         {
  32.             var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
  33.             if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
  34.             await manager.PublishAsync(@event);
  35.         }
  36.     }
  37.     public void OnSubscribe<TEvent>() where TEvent : IEvent
  38.     {
  39.         if (options.Pool)
  40.         {
  41.             Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
  42.             EventBusPool.OnSubscribe<TEvent>();
  43.         }
  44.         else
  45.         {
  46.             var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
  47.             if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
  48.             manager.AutoHandle();
  49.         }
  50.     }
  51. }
复制代码
分布式事件总线

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。

来源:https://www.cnblogs.com/donpangpang/Undeclared/17939849
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具