翼度科技»论坛 编程开发 .net 查看内容

自定义一个简单的Task调度器、任务循环调度器、TaskScheduler

3

主题

3

帖子

9

积分

新手上路

Rank: 1

积分
9
前言:
  自从接触异步(async await  Task)操作后,始终都不明白,这个Task调度的问题。
  接触Quartz.net已经很久了,只知道它实现了一套Task调度的方法,自己跟着Quartz.net源代码写了遍,调试后我算是明白了Task调度的一部分事(
 )。
  春风来不远,只在屋东头。
  理解Task运行,请参考大佬文章 https://www.cnblogs.com/artech/p/task_scheduling.html ,推荐大佬的书。
  直到我看Quartz.net源代码中的任务调度 “QueuedTaskScheduler”,我才搞明白了,如何写一个简单的任务调度器,或者说线程如何执行代码,才不会造成死循环,CPU吃满等问题,下面代码有的直接从quartz.net copy过来的。
BlockingCollection类

  微软文档 https://learn.microsoft.com/zh-cn/dotnet/standard/collections/thread-safe/blockingcollection-overview
  个人博客,中文解释通俗易懂 https://www.cnblogs.com/gl1573/p/14595985.html
  BlockingCollection 提供一个很重要的“阻塞”功能。
TaskScheduler类

  TaskScheduler 直译过来:表示一个对象,该对象处理将任务排队到线程上的低级工作。
  该类为抽象类,其真正意义在于“对Task任务的编排”
基于TaskScheduler类实现自定义的“Task队列调度器”

  源代码,我的仓库 https://github.com/qiqiqiyaya/Learning-Case/tree/main/TaskScheduler/AspNet6TaskScheduler
  1.定义一个存储Task的队列容器,使用BlockingCollection容器来添加Task,为什么使用BlockingCollection,后面会解释
  1.         /// <summary>The collection of tasks to be executed on our custom threads.</summary>
  2.         private readonly BlockingCollection<Task> _blockingTaskQueue;
复制代码
  2.定义CancellationTokenSource变量,用于释放。通常就是调用 CancellationToken.ThrowIfCancellationRequested() ,抛出一个 “OperationCanceledException”的异常,使正在执行的Task任务停止。
  3.创建Thread数组,用于存储创建出的Thread
  1.         /// <summary>The threads used by the scheduler to process work.</summary>
  2.         private readonly Thread[] _threads;
复制代码
   4.自定义一个类QueuedTaskScheduler,继承 “TaskScheduler”,“IDisposable”
  1. public class QueuedTaskScheduler: System.Threading.Tasks.TaskScheduler, IDisposable
复制代码
    实现构造函数
  1.         public QueuedTaskScheduler(int threadCount)
  2.         {
  3.             _threadCount = threadCount;
  4.             _blockingTaskQueue = new BlockingCollection<Task>();
  5.             // create threads
  6.             _threads = new Thread[threadCount];
  7.             for (int i = 0; i < threadCount; i++)
  8.             {
  9.                 _threads[i] = new Thread(ThreadBasedDispatchLoop)
  10.                 {
  11.                     Priority = ThreadPriority.Normal,
  12.                     IsBackground = true,
  13.                     Name = $"threadName ({i})"
  14.                 };
  15.             }
  16.             // start
  17.             foreach (var thread in _threads)
  18.             {
  19.                 thread.Start();
  20.             }
  21.         }
复制代码
    在构造函数中创建,并启动“Thread”,构造函数接收一个“线程数量的参数”,控制开启的线程数。
    Thread中实现的委托为“ThreadBasedDispatchLoop”,其表达意思是“基于循环的调度”。
  5.重点来了,具体看下“ThreadBasedDispatchLoop”方法的实现
ThreadBasedDispatchLoop实现
  1.          /// <summary>The dispatch loop run by all threads in this scheduler.</summary>
  2.         private void ThreadBasedDispatchLoop()
  3.         {
  4.             _taskProcessingThread.Value = true;
  5.             try
  6.             {
  7.                 // If a thread abort occurs, we'll try to reset it and continue running.
  8.                 while (true)
  9.                 {
  10.                     try
  11.                     {
  12.                         // For each task queued to the scheduler, try to execute it.
  13.                         foreach (var task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token))
  14.                         {
  15.                             TryExecuteTask(task);
  16.                         }
  17.                     }
  18.                     catch (ThreadAbortException)
  19.                     {
  20.                         // If we received a thread abort, and that thread abort was due to shutting down
  21.                         // or unloading, let it pass through.  Otherwise, reset the abort so we can
  22.                         // continue processing work items.
  23.                         if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload())
  24.                         {
  25. #pragma warning disable SYSLIB0006
  26.                             Thread.ResetAbort();
  27. #pragma warning restore SYSLIB0006
  28.                         }
  29.                     }
  30.                 }
  31.             }
  32.             catch (OperationCanceledException)
  33.             {
  34.                 // If the scheduler is disposed, the cancellation token will be set and
  35.                 // we'll receive an OperationCanceledException.  That OCE should not crash the process.
  36.             }
  37.             finally
  38.             {
  39.                 _taskProcessingThread.Value = false;
  40.             }
  41.         }
复制代码
    在外层套一层try catch捕获 CancellationTokenSource 变量,取消操作(CancellationTokenSource.Cancel())产生的异常,并且忽略该异常。
    其中使用while(true),无限循环执行,?????奇了怪了,为什么以前写代码时,while(true)写了,会直接把CPU吃满,程序搞奔溃呢????

    关键点就在于

    当_blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)执行时,如果_blockingTaskQueue容器中没有元素时,执行就会被“阻塞”,这种阻塞不会造成或者造成很小的资源浪费。
    当_blockingTaskQueue有值时,阻塞就会停止,_blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)执行,返回一个Task对象,然后开始执行 TryExecuteTask(task) ,执行Task。
  6.继承 “TaskScheduler”后需要实现的几个方法
    
GetScheduledTasks
  1.          protected override IEnumerable<Task>? GetScheduledTasks()
  2.         {
  3.             return _blockingTaskQueue.ToList();
  4.         }
复制代码
    GetScheduledTasks 返回需要被调度的 Tasks
QueueTask
  1.          protected override void QueueTask(Task task)
  2.         {
  3.             // QueuedTaskScheduler 释放时,禁止向队列中添加Task
  4.             if (_disposeCancellation.IsCancellationRequested)
  5.             {
  6.                 throw new ObjectDisposedException(GetType().Name);
  7.             }
  8.             _blockingTaskQueue.Add(task);
  9.         }
复制代码
    QueueTask 将排队等候的Task加入到 “_blockingTaskQueue”队列变量中
TryExecuteTaskInline
  1.          protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  2.         {
  3.             // If we're already running tasks on this threads, enable inlining
  4.             return _taskProcessingThread.Value && TryExecuteTask(task);
  5.         }
复制代码
    意思是,参数task是否在此线程上运行,请查看ThreadBasedDispatchLoop方法。
    “ThreadLocal” 该类型变量声明生命周期跟随 “构造函数”中启动的线程,且每一个线程单独一个变量,值存储在线程上。
    自此自定义“Task调度器”完成。
启动,运行QueuedTaskScheduler

  

  1. 创建 QueuedTaskScheduler ,其中用于执行Task的线程数为 1
  2.创建 Task ,并将其加入到指定的 Task调度器中。
  3.调试一下
    A. 创建 QueuedTaskScheduler ,创建 线程 “Thread” ,并启动线程
    

     B. 调试过程 
      

       当 _blockingTaskQueue没有Task时,执行到 _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token) 就会阻塞。
自此 自定义TaskScheduler完成。
我的源代码 https://github.com/qiqiqiyaya/Learning-Case/tree/main/TaskScheduler/AspNet6TaskScheduler 

来源:https://www.cnblogs.com/youlicc/archive/2023/05/17/17403429.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具