|
本地事件总线和事务
通过重写Ef Core的SaveChanges/SaveChangesAsync来实现事务。当然,如果您愿意实现仓储层,可以在仓储层实现展开对应实体包含的事件,并且调整事件的处理顺序。
Github仓库地址:soda-event-bus
实现AggregateRoot类
AggregateRoot类主要通过一个集合来记录本次事务的所有事件,到保存前再展开读取,在Abp中采用的ICollection记录的本地事件,通过实现一个排序器来保证顺序问题,我这里直接采用了ConcurrentQueue,保证原子操作的同时保证了顺序性,实现更简单一些。- public abstract class AggregateRoot
- {
- public ConcurrentQueue<object> LocalEvents { get; } = new();
- public void AddLocalEvent<TEvent>(TEvent eventData) where TEvent : IEvent
- {
- LocalEvents.Enqueue(eventData);
- }
- public bool GetLocalEvent(out object? @event)
- {
- LocalEvents.TryDequeue(out var eventData);
- @event = eventData;
- return @event is not null;
- }
- public void ClearLocalEvents()
- {
- LocalEvents.Clear();
- }
- }
复制代码 重写DbContext
主要是从ServiceProvider中获取对应实体类包含的事件,并且找到对应的Handler进行处理,然后再当作一个事务提交。- public class EventBusDbContext<TDbContext> : DbContext
- where TDbContext : DbContext
- {
- private readonly IServiceProvider _serviceProvider;
- public EventBusDbContext(DbContextOptions<TDbContext> options, IServiceProvider serviceProvider) : base(options)
- {
- _serviceProvider = serviceProvider;
- }
- public override int SaveChanges()
- {
- return base.SaveChanges();
- }
- public override int SaveChanges(bool acceptAllChangesOnSuccess)
- {
- return base.SaveChanges(acceptAllChangesOnSuccess);
- }
- public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
- {
- await HandleEventsAsync();
- return await base.SaveChangesAsync(cancellationToken);
- }
- public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = default)
- {
- await HandleEventsAsync();
- return await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
- }
- private async Task HandleEventsAsync()
- {
- foreach (var entityEntry in ChangeTracker.Entries<AggregateRoot>())
- {
- while (entityEntry.Entity.GetLocalEvent(out var @event))
- {
- if (@event is null) break;
- await HandleEventAsync(@event);
- }
- entityEntry.Entity.ClearLocalEvents();
- }
- }
- private async Task HandleEventAsync(object @event)
- {
- var eventHandlerType = typeof(IAsyncEventHandler<>).MakeGenericType(@event.GetType());
- var eventHandler = _serviceProvider.GetRequiredService(eventHandlerType);
- var method = eventHandler.GetType().GetMethod(nameof(IAsyncEventHandler<IEvent>.HandleAsync));
- var exceptionHandleMethod = eventHandlerType.GetMethod(nameof(IAsyncEventHandler<IEvent>.HandleException));
- try
- {
- await (Task)method!.Invoke(eventHandler, new[] { @event })!;
- }
- catch (Exception ex)
- {
- exceptionHandleMethod!.Invoke(eventHandler, new[] { @event, ex });
- }
- }
- }
复制代码 分布式事件总线和事务
根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。
来源:https://www.cnblogs.com/donpangpang/Undeclared/17939853
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
|