|
前言
看过不少关于 await 的原理的文章,也知道背后是编译器给转成了状态机实现的,但是具体是怎么完成的,回调又是如何衔接的,一直都没有搞清楚,这次下定决心把源码自己跑了下,终于豁然开朗了
本文的演示代码基于 VS2022 + .NET 6
示例
- public class Program
- {
- static int Work()
- {
- Console.WriteLine("In Task.Run");
- return 1;
- }
- static async Task TestAsync()
- {
- Console.WriteLine("Before Task.Run");
- await Task.Run(Work);
- Console.WriteLine("After Task.Run");
- }
- static void Main()
- {
- _ = TestAsync();
- Console.WriteLine("End");
- Console.ReadKey();
- }
- }
复制代码- class Program
- {
- static int Work()
- {
- Console.WriteLine("In Task.Run");
- return 1;
- }
- static Task TestAsync()
- {
- var stateMachine = new StateMachine()
- {
- _builder = AsyncTaskMethodBuilder.Create(),
- _state = -1
- };
- stateMachine._builder.Start(ref stateMachine);
- return stateMachine._builder.Task;
- }
- static void Main()
- {
- _ = TestAsync();
- Console.WriteLine("End");
- Console.ReadKey();
- }
- class StateMachine : IAsyncStateMachine
- {
- public int _state;
- public AsyncTaskMethodBuilder _builder;
- private TaskAwaiter<int> _awaiter;
- void IAsyncStateMachine.MoveNext()
- {
- int num = _state;
- try
- {
- TaskAwaiter<int> awaiter;
- if (num != 0)
- {
- Console.WriteLine("Before Task.Run");
- awaiter = Task.Run(Work).GetAwaiter();
- if (!awaiter.IsCompleted)
- {
- _state = 0;
- _awaiter = awaiter;
- StateMachine stateMachine = this;
- _builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
- return;
- }
- }
- else
- {
- awaiter = _awaiter;
- _awaiter = default;
- _state = -1;
- }
- awaiter.GetResult();
- Console.WriteLine("After Task.Run");
- }
- catch (Exception exception)
- {
- _state = -2;
- _builder.SetException(exception);
- return;
- }
- _state = -2;
- _builder.SetResult();
- }
- void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine) { }
- }
- }
复制代码 状态机实现
- 我们看到实际是生成了一个隐藏的状态机类 StateMachine
- 把状态机的初始状态 _state 设置 -1
- stateMachine._builder.Start(ref stateMachine); 启动状态机,内部实际调用的就是状态机的 MoveNext 方法
- Task.Run 创建一个任务, 把委托放在 Task.m_action 字段,丢到线程池,等待调度
- 任务在线程池内被调度完成后,是怎么回到这个状态机继续执行后续代码的呢?
_builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine); 就是关键了, 跟下去,到了如下的代码:- if (!this.AddTaskContinuation(stateMachineBox, false))
- {
- ThreadPool.UnsafeQueueUserWorkItemInternal(stateMachineBox, true);
- }
- bool AddTaskContinuation(object tc, bool addBeforeOthers)
- {
- return !this.IsCompleted && ((this.m_continuationObject == null && Interlocked.CompareExchange(ref this.m_continuationObject, tc, null) == null) || this.AddTaskContinuationComplex(tc, addBeforeOthers));
- }
复制代码
- 这里很清楚的看到,尝试把状态机对象(实际是状态机的包装类),赋值到 Task.m_continuationObject, 如果操作失败,则把状态机对象丢进线程池等待调度,这里为什么这么实现,看一下线程池是怎么执行的就清楚了
线程池实现
- .NET6 的线程池实现,实际是放到了 PortableThreadPool, 具体调试步骤我就不放了,直接说结果就是, 线程池线程从任务队列中拿到任务后都执行了 DispatchWorkItem 方法
- static void DispatchWorkItem(object workItem, Thread currentThread)
- {
- Task task = workItem as Task;
- if (task != null)
- {
- task.ExecuteFromThreadPool(currentThread);
- return;
- }
- Unsafe.As<IThreadPoolWorkItem>(workItem).Execute();
- }
- virtual void ExecuteFromThreadPool(Thread threadPoolThread)
- {
- this.ExecuteEntryUnsafe(threadPoolThread);
- }
复制代码
- 我们看到, 线程池队列中的任务都是 object 类型的, 这里进行了类型判断, 如果是 Task , 直接执行 task.ExecuteFromThreadPool, 更有意思的这个方法是个虚方法,后面说明
- ExecuteFromThreadPool 继续追下去,我们来到了这里,代码做了简化
- private void ExecuteWithThreadLocal(ref Task currentTaskSlot, Thread threadPoolThread = null)
- {
- this.InnerInvoke();
- this.Finish(true);
- }
- virtual void InnerInvoke()
- {
- Action action = this.m_action as Action;
- if (action != null)
- {
- action();
- return;
- }
- }
复制代码 - 很明显 this.InnerInvoke 就是执行了最开始 Task.Run(Work) 封装的委托了, 在 m_action 字段
- this.Finish(true); 跟下去会发现会调用 FinishStageTwo 设置任务的完成状态,异常等, 继续调用 FinishStageThree 就来了重点: FinishContinuations 这个方法就是衔接后续回调的核心
- internal void FinishContinuations()
- {
- object obj = Interlocked.Exchange(ref this.m_continuationObject, Task.s_taskCompletionSentinel);
- if (obj != null)
- {
- this.RunContinuations(obj);
- }
- }
复制代码 - 还记得状态机实现么, Task.m_continuationObject 字段实际存储的就是状态机的包装类,这里线程池线程也会判断这个字段有值的话,就直接使用它执行后续代码了
- void RunContinuations(object continuationObject)
- {
- var asyncStateMachineBox = continuationObject as IAsyncStateMachineBox;
- if (asyncStateMachineBox != null)
- {
- AwaitTaskContinuation.RunOrScheduleAction(asyncStateMachineBox, flag2);
- return;
- }
- }
- static void RunOrScheduleAction(IAsyncStateMachineBox box, bool allowInlining)
- {
- if (allowInlining && AwaitTaskContinuation.IsValidLocationForInlining)
- {
- box.MoveNext();
- return;
- }
- }
复制代码 总结
- Task.Run 创建 Task, 把委托放在 m_action 字段, 把 Task 压入线程池队列,等待调度
- _builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine); 尝试把状态机对象放在 Task.m_continuationObject 字段上,等待线程池线程调度完成任务后使用(用来执行后续),若操作失败,直接把状态机对象压入线程池队列,等待调度
- 线程池线程调度任务完成后,会判断 Task.m_continuationObject 有值,直接执行它的 MoveNext
备注
- 状态机实现中,尝试修改 Task.m_continuationObject,可能会失败,
就会直接把状态机对象压入线程池, 但是线程池调度,不都是判断是不是 Task 类型么, 其实状态机的包装类是 Task 的子类,哈哈,是不是明白了- class AsyncStateMachineBox<TStateMachine> : Task<TResult>, IAsyncStateMachineBox where TStateMachine : IAsyncStateMachine
- static void DispatchWorkItem(object workItem, Thread currentThread)
- {
- Task task = workItem as Task;
- if (task != null)
- {
- task.ExecuteFromThreadPool(currentThread);
- return;
- }
- Unsafe.As<IThreadPoolWorkItem>(workItem).Execute();
- }
复制代码
- 还有就是状态机包装类,重写了 Task.ExecuteFromThreadPool,所以线程池调用 task.ExecuteFromThreadPool 就是直接调用了状态机的 MoveNext 了, Soga ^_^
- override void ExecuteFromThreadPool(Thread threadPoolThread)
- {
- this.MoveNext(threadPoolThread);
- }
复制代码 参考链接
- 关于线程池和异步的更深刻的原理,大家可以参考下面的文章
概述 .NET 6 ThreadPool 实现: https://www.cnblogs.com/eventhorizon/p/15316955.html
.NET Task 揭秘(2):Task 的回调执行与 await: https://www.cnblogs.com/eventhorizon/p/15912383.html
来源:https://www.cnblogs.com/broadm/archive/2023/11/15/17833442.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
|