翼度科技»论坛 编程开发 .net 查看内容

.net core使用channel消息队列

9

主题

9

帖子

27

积分

新手上路

Rank: 1

积分
27
.net core使用channel消息队列

背景

最近做一个项目,连接了很多设备,需要保存设备的心跳数据,刚开始的做法是直接接收到设备的数据之后进行心跳数据的保存,但是随着设备多了起来,然后设备的使用时长不断的加大,对数据库的压力也比较大,所以想着优化一下。
方案调研

1.使用第三方中间件

常见的使用redis,或者mq,只需要不断的向中间件发送数据即可,redis使用队列,如果是mq直接发送消息即可,使用起来简单方便,但是要引入这些中间件,目前的架构里面没有,需要自己去起服务,维护。
2.使用channel

System.Threading.Channels 是.NET Core 3.0 后推出的新的集合类型, 具有异步API,高性能,线程安全等特点,它可以用来做消息队列,进行数据的生产和消费, 公开的 Writer 和 Reader api对应消息的生产者和消费者,也让Channel更加的简洁和易用,与Rabbit MQ 等其他队列不同的是,Channel 是进程内的队列
目前就介绍来看非常完美,不需要添加第三方中间件,直接添加现有的模块即可。
代码实现

选择了使用channel来做优化。拿到设备数据之后直接把消息丢入到channel,然后后台使用定时任务或者自己实现hostservice去不断的消费数据。
生产者代码
  1. public async Task ProduceHeartBeat(string message)
  2.         {
  3.             await channel.Writer.WriteAsync(message);
  4.         }
复制代码
不断的向里面写入数据即可.
消费者代码
  1.         /// <summary>
  2.         /// timespan时间内消费多少数据
  3.         /// </summary>
  4.         /// <param name="count"></param>
  5.         /// <param name="timeSpan"></param>
  6.         /// <returns></returns>
  7.         public async Task<List<string>> ConsumeHeartBeatAsync(int count,TimeSpan timeSpan)
  8.         {
  9.             var result = new List<string>(count);
  10.             CancellationTokenSource cts = new CancellationTokenSource();
  11.             var cancellationToken = cts.Token;
  12.             cts.CancelAfter(timeSpan);
  13.             int rcount = 0;
  14.             while ( !cancellationToken.IsCancellationRequested && rcount<count)
  15.             {
  16.                 //await Task.Delay(2000);
  17.                 if (channel.Reader.TryRead(out var number))
  18.                 {
  19.                     Console.WriteLine(number);
  20.                     result.Add(number);
  21.                     rcount++;
  22.                 }
  23.                 else
  24.                 {
  25.                     break;
  26.                 }
  27.                
  28.             }  
  29.             return result;
  30.         }
复制代码
使用的是BackgroundServic,直接实现要处理的业务逻辑就好了。在这里使用的是TaskCreationOptions.LongRunning,新开一个线程去处理心跳数据。
总结

以上就是主要的实现全过程,完整的代码在github
https://github.com/lackguozi/LearnChannelWebApi
实际上完全可以不用后台去定时消费数据,channel有很多api可以去处理,比如WaitToReadAsync(),但是这里没有使用,主要是不想持续的占数据库资源???总结的话学习了channel的用法,底层似乎使用了deque??只稍微看了下源码,但是看到了许多的lock,这个是必不可少的。还是巨硬轮子造的好 =_=

来源:https://www.cnblogs.com/guoxiaotian/archive/2023/06/26/17506536.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具