翼度科技»论坛 编程开发 .net 查看内容

模拟实现.net中的Task机制:探索异步编程的奥秘

3

主题

3

帖子

9

积分

新手上路

Rank: 1

积分
9
.net中使用Task可以方便地编写异步程序,为了更好地理解Task及其调度机制,接下来模拟Task的实现,目的是搞清楚:

  • Task是什么
  • Task是如何被调度的
基本的Task模拟实现

从最基本的Task用法开始
  1. Task.Run(Action action)
复制代码
这个命令的作用是将action作为一项任务提交给调度器,调度器会安排空闲线程来处理。
我们使用Job来模拟Task
  1. public class Job
  2. {
  3.     private readonly Action _work;
  4.     public Job(Action work) => _work = work;
  5.     public JobStatus Status { get; internal set; }
  6.     internal protected virtual void Invoke()
  7.     {
  8.         Status = JobStatus.Running;
  9.         _work();
  10.         Status = JobStatus.Completed;
  11.     }
  12.     public void Start(JobScheduler? scheduler = null)
  13.         => (scheduler ?? JobScheduler.Current).QueueJob(this);
  14.     public static Job Run(Action work)
  15.     {
  16.         var job = new Job(work);
  17.         job.Start();
  18.         return job;
  19.     }
  20. }
  21. public enum JobStatus
  22. {
  23.     Created,
  24.     Scheduled,
  25.     Running,
  26.     Completed
  27. }
复制代码
这里也定义了同Task一样的静态Run方法,使用方式也与Task类似
  1. Job.Run(() => Console.WriteLine($"Job1, thread:{Thread.CurrentThread.ManagedThreadId}"));
复制代码
作为对比,使用Task时的写法如下,多了await关键字,后文会讨论。
  1. await Task.Run(()=>() => Console.WriteLine($"Task1, thread:{Thread.CurrentThread.ManagedThreadId}"));
复制代码
调用Job.Run方法时,会基于给定的Action创建一个Job,然后执行job.Start(), 但Job没有立即开始执行,而是通过QueueJob方法提交给了调度器,由调度器来决定Job何时执行,在Job真正被执行时会调用其Invoke方法,此时给定的Action就会被执行了,同时会对应修改Job的状态,从Running到Completed。简单来说,.net的Task的基本工作过程与这个粗糙的Job一样,由此可见,Task/Job代表一项具有某种状态的操作
基于线程池的调度

但Task/Job的执行依赖与调度器,这里用JobScheduler来模拟,.net默认使用基于线程池的调度策略,我们也模拟实现一个ThreadPoolJobScheduler
首先看下JobScheduler,作为抽象基类,其QueueJob方法将有具体的某个调度器(ThreadPoolJobScheduler)来实现:
  1. public abstract class JobScheduler
  2. {
  3.     public abstract void QueueJob(Job job);
  4.     public static JobScheduler Current { get; set; } = new ThreadPoolJobScheduler();
  5. }
复制代码
ThreadPoolJobScheduler实现的QueueJob如下:
  1. public class ThreadPoolJobScheduler : JobScheduler
  2. {
  3.     public override void QueueJob(Job job)
  4.     {
  5.         job.Status = JobStatus.Scheduled;
  6.         var executionContext = ExecutionContext.Capture();
  7.         ThreadPool.QueueUserWorkItem(_ => ExecutionContext.Run(executionContext!,
  8.             _ => job.Invoke(), null));
  9.     }
  10. }
复制代码
ThreadPoolJobScheduler会将Job提交给线程池,并将Job状态设置为Scheduled。
使用指定线程进行调度

JobScheduler的Current属性默认设置为基于线程的调度,如果有其它调度器也可以更换,但为什么要更换呢?这要从基于线程的调度的局限说起,对于一些具有较高优先级的任务,采用这个策略可能会无法满足需求,比如当线程都忙的时候,新的任务可能迟迟无法被执行。对于这种情况,.net可以通过设置TaskCreationOptions.LongRunning来解决,解析来先用自定义的调度器来解决这个问题:
  1. public class DedicatedThreadJobScheduler : JobScheduler
  2. {
  3.     private readonly BlockingCollection<Job> _queues=new();
  4.     private readonly Thread[] _threads;
  5.     public DedicatedThreadJobScheduler(int threadCount)
  6.     {
  7.         _threads=new Thread[threadCount];
  8.         for(int index=0; index< threadCount; index++)
  9.         {
  10.             _threads[index] =new Thread(Invoke);
  11.         }
  12.         Array.ForEach(_threads, thread=>thread.Start());
  13.         void Invoke(object? state){
  14.             while(true){
  15.                 _queues.Take().Invoke();
  16.             }
  17.         }
  18.     }
  19.     public override void QueueJob(Job job)
  20.     {
  21.         _queues.Add(job);
  22.     }
  23. }
复制代码
在启动DedicatedThreadJobScheduler时,会启动指定数量的线程,这些线程会不停地从队列中取出任务并执行。
接下来看看.net的TaskCreationOptions.LongRunning怎么用:
  1. await Task.Factory.StartNew(LongRunningMethod, TaskCreationOptions.LongRunning);
  2. static void LongRunningMethod()
  3. {
  4.     // Simulate a long-running operation
  5.     Console.WriteLine("Long-running task started on thread {0}.", Thread.CurrentThread.ManagedThreadId);
  6.     Thread.Sleep(10000);
  7.     Console.WriteLine("Long-running task finished on thread {0}.", Thread.CurrentThread.ManagedThreadId);
  8. }
复制代码
任务顺序的编排

在使用Task时,经常会使用await关键字,来控制多个异步任务之间的顺序,await实际上是语法糖,在了解await之前,先来看看最基本的ContinueWith方法。
  1. var taskA = Task.Run(() => DateTime.Now);
  2. var taskB = taskA.ContinueWith(time => Console.WriteLine(time.Result));
  3. await taskB;
复制代码
模仿Task,我们给Job也添加ContinueWith方法。
  1. public class Job
  2. {
  3.     private readonly Action _work;
  4.     private Job? _continue;
  5.     public Job(Action work) => _work = work;
  6.     public JobStatus Status { get; internal set; }
  7.     internal protected virtual void Invoke()
  8.     {
  9.         Status = JobStatus.Running;
  10.         _work();
  11.         Status = JobStatus.Completed;
  12.         _continue?.Start();
  13.     }
  14.     public void Start(JobScheduler? scheduler = null)
  15.         => (scheduler ?? JobScheduler.Current).QueueJob(this);
  16.     public static Job Run(Action work)
  17.     {
  18.         var job = new Job(work);
  19.         job.Start();
  20.         return job;
  21.     }
  22.     public Job ContinueWith(Action<Job> tobeContinued)
  23.     {
  24.         if (_continue == null)
  25.         {
  26.             var job = new Job(() => tobeContinued(this));
  27.             _continue = job;
  28.         }
  29.         else
  30.         {
  31.             _continue.ContinueWith(tobeContinued);
  32.         }
  33.         return this;
  34.     }
  35. }
复制代码
这个ContinueWith方法会将下一个待执行的Job放在_continue,这样多个顺序执行的Job就会构成一个链表。
在当前Job的Invoke方法执行结束时,会触发下一个Job被调度。
使用示例:
  1. Job.Run(() =>
  2. {
  3.     Thread.Sleep(1000);
  4.     Console.WriteLine("11");
  5. }).ContinueWith(_ =>
  6. {
  7.     Thread.Sleep(1000);
  8.     Console.WriteLine("12");
  9. });
复制代码
进一步使用await关键字来控制

要像Task一样使用await,需要Job支持有GetAwaiter方法。任何一个类型,只要有了这个GetAwaiter方法,就可以对其使用await关键字了。
c#的Task类中可以找到GetAwaiter
  1. public TaskAwaiter GetAwaiter();
复制代码
然后TaskAwaiter继承了ICriticalNotifyCompletion接口
  1. public readonly struct TaskAwaiter<TResult> : System.Runtime.CompilerServices.ICriticalNotifyCompletion
复制代码
照猫画虎,也为Job添加一个最简单的JobAwaiter
  1. public class Job
  2. {
  3.     ...
  4.     public JobAwaiter GetAwaiter() => new(this);
  5. }
复制代码
JobAwaiter的定义如下:
  1. public struct JobAwaiter : ICriticalNotifyCompletion
  2. {
  3.     private readonly Job _job;
  4.     public readonly bool IsCompleted => _job.Status == JobStatus.Completed;
  5.     public JobAwaiter(Job job)
  6.     {
  7.         _job = job;
  8.         if (job.Status == JobStatus.Created)
  9.         {
  10.             job.Start();
  11.         }
  12.     }
  13.    
  14.     public void GetResult() { }
  15.     public void OnCompleted(Action continuation)
  16.     {
  17.         _job.ContinueWith(_ => continuation());
  18.     }
  19.     public void UnsafeOnCompleted(Action continuation)
  20.     => OnCompleted(continuation);
  21. }
复制代码
添加了await后,前面的代码也可以这样写:
  1. await F1();
  2. await F2();
  3. static Job F1() => new Job(() =>
  4. {
  5.         Thread.Sleep(1000);
  6.         Console.WriteLine("11");
  7. });
  8. static Job F2() => new Job(() =>
  9. {
  10.         Thread.Sleep(1000);
  11.         Console.WriteLine("12");
  12. });
复制代码
总结

回顾开头的两个问题,现在可以尝试给出答案了。

  • Task是什么,Task是一种有状态的操作(Created,Scheduled,Running,Completed),是对耗时操作的抽象,就像现实中的一项任务一样,它的执行需要相对较长的时间,它也有创建(Created),安排(Scheduled),执行(Running),完成(Completed)的基本过程。任务完成当然需要拿到结果的,这里的Job比较简单,没有模拟具体的结果;
  • Task是如何被调度的,默认采用基于线程池的调度,即创建好Task后,由线程池中的空闲线程执行,具体什么时候执行、由哪个线程执行,开发者是不用关心的,在具体执行过程中,
    但由于.net全局线程池的局限,对于一些特殊场景无法满足时(比如需要立即执行Task),此时可以通过TaskCreationOptions更改调度行为;
另外,await是语法糖,它背后的实现是基于GetAwaiter,由其返回ICriticalNotifyCompletion接口的实现,并对ContinueWith做了封装。

来源:https://www.cnblogs.com/zhixin9001/archive/2023/11/01/17804392.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具