翼度科技»论坛 编程开发 .net 查看内容

本地事件总线和事务

8

主题

8

帖子

24

积分

新手上路

Rank: 1

积分
24
本地事件总线和事务

通过重写Ef Core的SaveChanges/SaveChangesAsync来实现事务。当然,如果您愿意实现仓储层,可以在仓储层实现展开对应实体包含的事件,并且调整事件的处理顺序。
Github仓库地址:soda-event-bus
实现AggregateRoot类

AggregateRoot类主要通过一个集合来记录本次事务的所有事件,到保存前再展开读取,在Abp中采用的ICollection记录的本地事件,通过实现一个排序器来保证顺序问题,我这里直接采用了ConcurrentQueue,保证原子操作的同时保证了顺序性,实现更简单一些。
  1. public abstract class AggregateRoot
  2. {
  3.     public ConcurrentQueue<object> LocalEvents { get; } = new();
  4.     public void AddLocalEvent<TEvent>(TEvent eventData) where TEvent : IEvent
  5.     {
  6.         LocalEvents.Enqueue(eventData);
  7.     }
  8.     public bool GetLocalEvent(out object? @event)
  9.     {
  10.         LocalEvents.TryDequeue(out var eventData);
  11.         @event = eventData;
  12.         return @event is not null;
  13.     }
  14.     public void ClearLocalEvents()
  15.     {
  16.         LocalEvents.Clear();
  17.     }
  18. }
复制代码
重写DbContext

主要是从ServiceProvider中获取对应实体类包含的事件,并且找到对应的Handler进行处理,然后再当作一个事务提交。
  1. public class EventBusDbContext<TDbContext> : DbContext
  2.     where TDbContext : DbContext
  3. {
  4.     private readonly IServiceProvider _serviceProvider;
  5.     public EventBusDbContext(DbContextOptions<TDbContext> options, IServiceProvider serviceProvider) : base(options)
  6.     {
  7.         _serviceProvider = serviceProvider;
  8.     }
  9.     public override int SaveChanges()
  10.     {
  11.         return base.SaveChanges();
  12.     }
  13.     public override int SaveChanges(bool acceptAllChangesOnSuccess)
  14.     {
  15.         return base.SaveChanges(acceptAllChangesOnSuccess);
  16.     }
  17.     public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
  18.     {
  19.         await HandleEventsAsync();
  20.         return await base.SaveChangesAsync(cancellationToken);
  21.     }
  22.     public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = default)
  23.     {
  24.         await HandleEventsAsync();
  25.         return await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
  26.     }
  27.     private async Task HandleEventsAsync()
  28.     {
  29.         foreach (var entityEntry in ChangeTracker.Entries<AggregateRoot>())
  30.         {
  31.             while (entityEntry.Entity.GetLocalEvent(out var @event))
  32.             {
  33.                 if (@event is null) break;
  34.                 await HandleEventAsync(@event);
  35.             }
  36.             entityEntry.Entity.ClearLocalEvents();
  37.         }
  38.     }
  39.     private async Task HandleEventAsync(object @event)
  40.     {
  41.         var eventHandlerType = typeof(IAsyncEventHandler<>).MakeGenericType(@event.GetType());
  42.         var eventHandler = _serviceProvider.GetRequiredService(eventHandlerType);
  43.         var method = eventHandler.GetType().GetMethod(nameof(IAsyncEventHandler<IEvent>.HandleAsync));
  44.         var exceptionHandleMethod = eventHandlerType.GetMethod(nameof(IAsyncEventHandler<IEvent>.HandleException));
  45.         try
  46.         {
  47.             await (Task)method!.Invoke(eventHandler, new[] { @event })!;
  48.         }
  49.         catch (Exception ex)
  50.         {
  51.             exceptionHandleMethod!.Invoke(eventHandler, new[] { @event, ex });
  52.         }
  53.     }
  54. }
复制代码
分布式事件总线和事务

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。

来源:https://www.cnblogs.com/donpangpang/Undeclared/17939853
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具