|
生成者/消费者概念编程模型
通道是生成者/使用者概念编程模型的实现。 在此编程模型中,生成者异步生成数据,使用者异步使用该数据。 换句话说,此模型将数据从一方移交给另一方。 尝试将通道视为任何其他常见的泛型集合类型,例如 List。 主要区别在于,此集合管理同步,并通过工厂创建选项提供各种消耗模型。 这些选项控制通道的行为,例如允许它们存储的元素数,以及达到该限制时会发生什么情况,或者通道是由多个生成者还是多个使用者同时访问
channel简介
channel提供了用于在生成者和使用者之间以异步方式传递数据的一组同步数据结构。
channel(管道)提供了有界通道和无界通道
无界通道
该通道可以同时供任意数量的读取器和编写器使用。 或者,可以通过提供 UnboundedChannelOptions 实例在创建无限制通道时指定非默认行为。 该通道的容量不受限制,并且所有写入均以同步方式执行
有界通道
创建有界通道时,该通道将绑定到最大容量。 达到边界时,默认行为是通道异步阻止生成者,直到空间可用。 可以通过在创建通道时指定选项来配置此行为。 可以使用任何大于零的容量值创建有界通道
模式行为
BoundedChannelFullMode.Wait
这是默认值。 WriteAsync调用 以等待空间可用以完成写入操作。 调用 以 TryWrite 立即返回 false 。
BoundedChannelFullMode.DropNewest
删除并忽略通道中的最新项,以便为要写入的项留出空间。
BoundedChannelFullMode.DropOldest
删除并忽略通道中的最旧项,以便为要写入的项留出空间。
BoundedChannelFullMode.DropWrite 删除要写入的项。
Channel.Writer API
生成者功能在 Channel.Writer 上公开。 下表详细介绍了生成者 API 和预期行为:
ChannelWriter.Complete
将通道标记为已完成,这意味着不再向该通道写入更多项。
ChannelWriter.TryComplete
尝试将通道标记为已完成,这意味着不会向通道写入更多数据。
ChannelWriter.TryWrite
尝试将指定的项写入到通道。 当与无界通道一起使用时,除非通道的编写器通过 ChannelWriter.Complete 或 ChannelWriter.TryComplete 发出完成信号,否则这将始终返回 true。
ChannelWriter.WaitToWriteAsync
返回一个 ValueTask ,当有空间可以写入项时完成。
ChannelWriter.WriteAsync 以异步方式将项写入到通道
Channel.Reader API
ChannelReader.ReadAllAsync
创建允许从通道中读取所有数据的 IAsyncEnumerable。
ChannelReader.ReadAsync
以异步方式从通道中读取项。
ChannelReader.TryPeek
尝试从通道中查看项。
ChannelReader.TryRead
尝试从通道中读取项。
ChannelReader.WaitToReadAsync
返回在 ValueTask 数据可供读取时完成的 。
channel的具体使用
https://learn.microsoft.com/zh-cn/dotnet/core/extensions/channels
基于channel实现事件总线
EventDiscriptorAttribute 特性定义
- [AttributeUsage(AttributeTargets.Class,AllowMultiple = false,Inherited = false)]
- public class EventDiscriptorAttribute:Attribute
- {
- /// <summary>
- /// 事件2名称
- /// </summary>
- public string EventName { get; private set; }
- /// <summary>
- /// channel 容量设置
- /// </summary>
- public int Capacity { get; private set; }
- /// <summary>
- /// 是否维持一个生产者多个消费者模型
- /// </summary>
- public bool SigleReader { get; private set; }
- public EventDiscriptorAttribute(string eventName, int capacity = 1000, bool sigleReader = true)
- {
- EventName = eventName;
- Capacity = capacity;
- SigleReader = sigleReader;
- }
- }
复制代码 定义通道容器
- //通道容器单列注入,在拓展类中初始化
- public class ChannelContainer : IChannelContainer
- {
- public List<EventDiscription> Events { get; private set; }
- private readonly IServiceCollection Services;
- public EventHandlerContainer(IServiceCollection services)
- {
- Events = new List<EventDiscription>();
- Services = services;
- services.AddSingleton<IEventHandlerContainer>(this);
- }
- private bool Check(Type type)
- {
- var discription = Events.FirstOrDefault(p=>p.EtoType == type);
- return discription is null;
- }
-
- ///订阅并且注入EventHandler
- public void TryAddChannle(Type eto,Type handler)
- {
- if(!Check(eto))
- {
- return;
- }
- Events.Add(new EventDiscription(eto, handler));
- var handlerbaseType = typeof(IEventHandler<>);
- var handlertype = handlerbaseType.MakeGenericType(eto);
- if(Services.Any(P=>P.ServiceType==handlertype))
- {
- return;
- }
- Services.AddTransient(handlertype, handler);
- }
- public void TryAddChannle<TEto, THandler>()
- {
- TryAddChannle(typeof(TEto),typeof(THandler));
- }
-
- public void TryAddChannle(Type eto)
- {
- if (!Check(eto))
- {
- return;
- }
- Events.Add(new EventDiscription(eto));
- var handlerbaseType = typeof(IEventHandler<>);
- var handlertype = handlerbaseType.MakeGenericType(eto);
- if (Services.Any(P => P.ServiceType == handlertype))
- {
- return;
- }
- }
- public void TryAddChannle<TEto>()
- {
- TryAddChannle(typeof(TEto));
- }
复制代码 事件管理器
事件管理器通过线程安全字典管理事件通道和事件的触发
可以看到在Subscribe 方法中消费者并不是在订阅后立即执行的而是放到EventTrigger中的定义的异步事件中去
消费者执行最后又.,NET提供的托管任务去执行- public class EventHandlerManager : IEventHandlerManager,IDisposable
- {
- private ConcurrentDictionary<string, Channel<string>> Channels;
- private bool IsDiposed = false;
- private readonly IServiceProvider ServiceProvider;
- private readonly CancellationToken _cancellation;
- private readonly IEventHandlerContainer _eventHandlerContainer;
- private readonly ILogger _logger;
- private ConcurrentDictionary<string,EventTrigger> EventTriggers;
- private bool IsInitConsumer = true;
- public EventHandlerManager( IServiceProvider serviceProvider
- , IEventHandlerContainer eventHandlerContainer
- , ILoggerFactory loggerFactory)
- {
- ServiceProvider = serviceProvider;
- _cancellation = CancellationToken.None;
- _eventHandlerContainer = eventHandlerContainer;
- Channels = new ConcurrentDictionary<string, Channel<string>>();
- EventTriggers = new ConcurrentDictionary<string, EventTrigger>();
- _logger = loggerFactory.CreateLogger<IEventHandlerManager>();
- }
- //初始化通信管道
- public async Task CreateChannles()
- {
- var eventDiscriptions = _eventHandlerContainer.Events;
- foreach(var item in eventDiscriptions)
- {
- var attribute = item.EtoType.GetCustomAttributes()
- .OfType<EventDiscriptorAttribute>()
- .FirstOrDefault();
- if (attribute is null)
- {
- ThorwEventAttributeNullException.ThorwException();
- }
- var channel = Channels.GetValueOrDefault(attribute.EventName);
- if (channel is not null)
- {
- return;
- }
- //创建无界通道模型,并且初始化容量大小,当无容量写入后等待写入
- channel = Channel.CreateBounded<string>(
- new BoundedChannelOptions(attribute.Capacity)
- {
- SingleWriter = true,
- SingleReader = false,
- AllowSynchronousContinuations = false,
- FullMode = BoundedChannelFullMode.Wait
- });
- Channels.TryAdd(attribute.EventName, channel);
- _logger.LogInformation($"创建通信管道{item.EtoType}--{attribute.EventName}");
- }
- await Task.CompletedTask;
- }
- private Channel<string> Check(Type type)
- {
- var attribute = type .GetCustomAttributes()
- .OfType<EventDiscriptorAttribute>()
- .FirstOrDefault();
- if (attribute is null)
- {
- ThorwEventAttributeNullException.ThorwException();
- }
- var channel = Channels.GetValueOrDefault(attribute.EventName);
- if(channel is null)
- {
- ThrowChannelNullException.ThrowException(attribute.EventName);
- }
- return channel;
- }
- public void Dispose()
- {
- IsDiposed = true;
- IsInitConsumer = true;
- foreach(var trigger in EventTriggers.Values)
- {
- trigger.Dispose();
- }
- _cancellation.ThrowIfCancellationRequested();
- }
- /// <summary>
- /// 生产者
- /// </summary>
- /// <typeparam name="TEto"></typeparam>
- /// <param name="eto"></param>
- /// <returns></returns>
- public async Task WriteAsync<TEto>(TEto eto) where TEto : class
- {
- var channel = Check(typeof(TEto));
- //由于创建的是有界通道,存在有界通道消息积累超过初始大小所以循环判断是否可以写入消息
- while ( await channel.Writer.WaitToWriteAsync(CancellationToken.None))
- {
- var data = JsonConvert.SerializeObject(eto);
- await channel.Writer.WriteAsync(data, _cancellation);
- }
- }
- /// <summary>
- /// 消费者
- /// </summary>
- /// <returns></returns>
- public void Subscribe<TEto>() where TEto : class
- {
- var attribute = typeof(TEto).GetCustomAttributes()
- .OfType<EventDiscriptorAttribute>()
- .FirstOrDefault();
- if (attribute is null)
- {
- ThorwEventAttributeNullException.ThorwException();
- }
- if (EventTriggers.Keys.Any(p => p == attribute.EventName))
- {
- return;
- }
- Func<Task> func = async () =>
- {
- var scope = ServiceProvider.CreateAsyncScope();
- var channel = Check(typeof(TEto));
- var handler = scope.ServiceProvider.GetRequiredService<IEventHandler<TEto>>();
- var reader = channel.Reader;
- try
- {
- while (await channel.Reader.WaitToReadAsync())
- {
- while (reader.TryRead(out string str))
- {
- var data = JsonConvert.DeserializeObject<TEto>(str);
- _logger.LogInformation(str);
- await handler.HandelrAsync(data);
- }
- }
- }
- catch (Exception e)
- {
- _logger.LogInformation($"本地事件总线异常{e.Source}--{e.Message}--{e.Data}");
- throw;
- }
- };
- var trigger = new EventTrigger();
- trigger.Recived(func);
- EventTriggers.TryAdd(attribute.EventName, trigger);
- }
- public Task Trigger()
- {
- //只允许初始化一次消费者
- if (IsInitConsumer)
- {
- foreach (var eventTrigger in EventTriggers)
- {
- Task.Factory.StartNew(async () =>
- {
- await eventTrigger.Value.Trigger();
- });
- }
- }
- IsInitConsumer = false;
- return Task.CompletedTask;
- }
- }
- }
复制代码 EventTrigger 定义
- public class EventTrigger:IDisposable
- {
- public event Func<Task>? Event;
- public EventTrigger()
- {
- }
- public void Recived(Func<Task> func)
- {
- if (Event is not null)
- {
- return;
- }
- Event += func;
- }
- public Task Trigger()
- {
- if(Event is null)
- {
- return Task.CompletedTask;
- }
- return Event();
- }
- public void Dispose()
- {
- if( Event is not null )
- {
- Event = null;
- }
- }
- }
复制代码 托管任务执行EventHandlerManager Trigger()方法
- public class EventBusBackgroundService : BackgroundService
- {
- private readonly IEventHandlerManager _eventHandlerManager;
- public EventBusBackgroundService(IEventHandlerManager eventHandlerManager)
- {
- _eventHandlerManager = eventHandlerManager;
- }
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- await _eventHandlerManager.Trigger();
- }
- }
复制代码 拓展类定义
- public static class EventBusExtensions
- {
- //添加事件总线并且添加channle管道
- public static IServiceCollection AddEventBusAndChannles(this IServiceCollection services,Action<EventHandlerContainer> action)
- {
- services.AddSingleton<IEventHandlerManager, EventHandlerManager>();
- services.AddTransient<ILocalEventBus, LocalEventBus>();
- ///添加托管任务
- services.AddHostedService<EventBusBackgroundService>();
- EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);
- action.Invoke(eventHandlerContainer);
- return services;
- }
- //创建通信管道
- public static async Task InitChannles(this IServiceProvider serviceProvider,Action<IEventHandlerManager> action)
- {
- var scope = serviceProvider.CreateAsyncScope();
- var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();
-
- //初始化通信管道
- await eventhandlerManager.CreateChannles();
- action.Invoke(eventhandlerManager);
- }
- //添加本地事件总线
- public static IServiceCollection AddEventBus(this IServiceCollection services)
- {
- services.AddSingleton<IEventHandlerManager, EventHandlerManager>();
- services.AddTransient<ILocalEventBus, LocalEventBus>();
- services.AddHostedService<EventBusBackgroundService>();
- return services;
- }
- //添加通信管道
- public static IServiceCollection AddChannles(this IServiceCollection services, Action<EventHandlerContainer> action)
- {
- EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);
- action.Invoke(eventHandlerContainer);
- return services;
- }
- }
- }
复制代码 使用
- context.Services.AddEventBus();
- //添加通信管道
- context.Services.AddChannles(p =>
- {
- p.TryAddChannle<TestEto>();
- });
- //
- var scope = context.ServiceProvider.CreateScope();
- var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();
- //初始化通信管道
- await eventhandlerManager.CreateChannles();
- //订阅事件
- eventhandlerManager.Subscribe<TestEto>();
复制代码- //定义EventHandler
- public class TestEventHandler : IEventHandler<TestEto>,ITransientInjection
- {
- private ILogger _logger;
- public TestEventHandler(ILoggerFactory factory)
- {
- _logger = factory.CreateLogger<TestEventHandler>();
- }
- public Task HandelrAsync(TestEto eto)
- {
- _logger.LogInformation($"{typeof(TestEto).Name}--{eto.Name}--{eto.Description}");
- return Task.CompletedTask;
- }
- }
-
- //构造函数注入
- [HttpGet]
- public async Task TestLocalEventBus()
- {
- TestEto eto = null;
- for(var i = 0; i < 100; i++)
- {
- eto = new TestEto()
- {
- Name ="LocalEventBus" + i.ToString(),
- Description ="wyg"+i.ToString(),
- };
- await _localEventBus.PublichAsync(eto);
- }
- }
复制代码 总结
作为一个才毕业一年的初级程序员的我来说这次的channel的事件总线的封装还存在着许多不足
1.无法对消息进行持久化的管理
2.没有对消息异常进行处理
3.没有做到像abp那样自动订阅
当然还存在着一些我不知道问题,欢迎各位大佬提出问题指正
源码链接
这里提一嘴(有个小小的请求),有广州的老哥所在的公司目前还招人的话,希望给小弟一个机会(找了快一个月工作了,确实有点难,联系方式vx:wenyg2001411)
来源:https://www.cnblogs.com/wygbjd/archive/2023/09/19/17715763.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
|