祖国的儿子 发表于 2023-8-16 22:07:25

async_await 源码分析

async/await 源码解析

这篇文章主要是分析 async/await 这个语法糖,分析一下 async 和 await 是如何做到异步的。首先,我先抛出两个问题,各位可以先想一下。

[*]await 之后的方法是何时执行,如何执行的?
[*]为什么 await 之后的代码会在不同的线程执行?
demo

要想知道 async/await 是怎么运行的,需要先写一个demo,然后进行一下反编译,就可以得到 async/await 编译后的代码,然后就可以开始分析了。
下面是简单使用 async/await 的demo:
static async Task Main(string[] args)
{
       Console.WriteLine("1"+Thread.CurrentThread.ManagedThreadId.ToString());
       await GetThreadID2();
       Console.WriteLine("2"+Thread.CurrentThread.ManagedThreadId.ToString());
}
public async static Task GetThreadID2()
{
       Console.WriteLine("3"+Thread.CurrentThread.ManagedThreadId.ToString());
       await Task.Run(() => Console.WriteLine("4"+Thread.CurrentThread.ManagedThreadId.ToString()));         
       Console.WriteLine("5"+Thread.CurrentThread.ManagedThreadId.ToString());
}然后我们来反编译一下这段代码(可以使用 dnSpy 进行反编译):
namespace AsyncTest
{
    internal class Program
    {
      
      private static Task Main(string[] args)
      {
            Program.<Main>d__0 <Main>d__ = new Program.<Main>d__0();
            <Main>d__.<>t__builder = AsyncTaskMethodBuilder.Create();
            <Main>d__.args = args;
            <Main>d__.<>1__state = -1;
            <Main>d__.<>t__builder.Start<Program.<Main>d__0>(ref <Main>d__);
            return <Main>d__.<>t__builder.Task;
      }
      
      public static Task GetThreadID2()
      {
            Program.<GetThreadID2>d__2 <GetThreadID2>d__ = new Program.<GetThreadID2>d__2();
            <GetThreadID2>d__.<>t__builder = AsyncTaskMethodBuilder.Create();
            <GetThreadID2>d__.<>1__state = -1;
            <GetThreadID2>d__.<>t__builder.Start<Program.<GetThreadID2>d__2>(ref <GetThreadID2>d__);
            return <GetThreadID2>d__.<>t__builder.Task;
      }

      
      private static void <Main>(string[] args)
      {
            Program.Main(args).GetAwaiter().GetResult();
      }
      
      private sealed class <Main>d__0 : IAsyncStateMachine
      {
            void IAsyncStateMachine.MoveNext()
            {
                int num = this.<>1__state;
                try
                {
                  TaskAwaiter awaiter;
                  if (num != 0)
                  {
                        Console.WriteLine("1" + Thread.CurrentThread.ManagedThreadId.ToString());
                        awaiter = Program.GetThreadID2().GetAwaiter();
                        if (!awaiter.IsCompleted)
                        {
                            this.<>1__state = 0;
                            this.<>u__1 = awaiter;
                            Program.<Main>d__0 <Main>d__ = this;
                            this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref <Main>d__);
                            return;
                        }
                  }
                  else
                  {
                        awaiter = this.<>u__1;
                        this.<>u__1 = default(TaskAwaiter);
                        this.<>1__state = -1;
                  }
                  awaiter.GetResult();
                  Console.WriteLine("2" + Thread.CurrentThread.ManagedThreadId.ToString());
                  Console.ReadLine();
                }
                catch (Exception exception)
                {
                  this.<>1__state = -2;
                  this.<>t__builder.SetException(exception);
                  return;
                }
                this.<>1__state = -2;
                this.<>t__builder.SetResult();
            }
      }
      
      
      private sealed class <>c
      {
            internal void <GetThreadID2>b__2_0()
            {
                Console.WriteLine("4" + Thread.CurrentThread.ManagedThreadId.ToString());
            }
            public static readonly Program.<>c <>9 = new Program.<>c();

            public static Func<string> <>9__1_0;

            public static Action <>9__2_0;
      }
      
      private sealed class <GetThreadID2>d__2 : IAsyncStateMachine
      {
            void IAsyncStateMachine.MoveNext()
            {
                int num = this.<>1__state;
                try
                {
                  TaskAwaiter awaiter;
                  if (num != 0)
                  {
                        Console.WriteLine("3" + Thread.CurrentThread.ManagedThreadId.ToString());
                        awaiter = Task.Run(new Action(Program.<>c.<>9.<GetThreadID2>b__2_0)).GetAwaiter();
                        if (!awaiter.IsCompleted)
                        {
                            this.<>1__state = 0;
                            this.<>u__1 = awaiter;
                            Program.<GetThreadID2>d__2 <GetThreadID2>d__ = this;
                            this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<GetThreadID2>d__2>(ref awaiter, ref <GetThreadID2>d__);
                            return;
                        }
                  }
                  else
                  {
                        awaiter = this.<>u__1;
                        this.<>u__1 = default(TaskAwaiter);
                        this.<>1__state = -1;
                  }
                  awaiter.GetResult();
                  Console.WriteLine("5" + Thread.CurrentThread.ManagedThreadId.ToString());
                }
                catch (Exception exception)
                {
                  this.<>1__state = -2;
                  this.<>t__builder.SetException(exception);
                  return;
                }
                this.<>1__state = -2;
                this.<>t__builder.SetResult();
            }
            
            void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine){}

            public int <>1__state;

            public AsyncTaskMethodBuilder <>t__builder;

            private TaskAwaiter <>u__1;
      }
    }
}上面的代码是反编译出来的所有代码,如果各位选择太长不看的话,这里还有个简化版本:
private static Task Main(string[] args)
<Main>d__.<>t__builder.Start<Program.<Main>d__0>(ref <Main>d__);
<Main>d__0.MoveNext()
    <GetThreadID2>d__.<>t__builder.Start<Program.<GetThreadID2>d__2>(ref <GetThreadID2>d__);
    <GetThreadID2>d__2.MoveNext()
    <GetThreadID2>d__2.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<GetThreadID2>d__2>(ref awaiter, ref <GetThreadID2>d__);
    <GetThreadID2>d__2.MoveNext()
<Main>d__0.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref <Main>d__);
<Main>d__0.MoveNext() 入口

我们先来看一下 Main 方法。
private static Task Main(string[] args)      
{
    Program.<Main>d__0 <Main>d__ = new Program.<Main>d__0();
    <Main>d__.<>t__builder = AsyncTaskMethodBuilder.Create();
    <Main>d__.args = args;
    <Main>d__.<>1__state = -1;
    <Main>d__.<>t__builder.Start<Program.<Main>d__0>(ref <Main>d__);
    return <Main>d__.<>t__builder.Task;
}首先第一行,构建了一个 Program.d__0 类型实例,嗯? 这个类哪里来的,没写过呀。
哎嘿,这个类就是编译器帮我们实现的。每一个 async/await 方法,编译器都会帮我们实现一个这样的类。
简单分析一下这个方法,实例化一个 Program.d__0(), 初始化 1__state 值为-1,调用 Program.d__0() 的 Start 方法。
Start

public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
    ExecutionContextSwitcher executionContextSwitcher = default(ExecutionContextSwitcher);
    RuntimeHelpers.PrepareConstrainedRegions();
    try
    {
      ExecutionContext.EstablishCopyOnWriteScope(ref executionContextSwitcher);
      stateMachine.MoveNext();
    }
    finally
    {
      executionContextSwitcher.Undo();
    }
}Start 代码的主要作用就是启动 stateMachine,即 stateMachine.MoveNext()。
MoveNext

MoveNext 是 async/await 里非常重要的一段代码,这个方法控制了整个异步任务的执行步骤。
我们可以将其分解成三个部分,即

[*]await 之前
[*]await 执行
[*]await 之后的代码
具体的分析各位请看注释。
void IAsyncStateMachine.MoveNext()
{
    // 这里 <>1__state 之前初始化的时候是 -1,所以 num 就是-1
    int num = this.<>1__state;
    try
    {
      TaskAwaiter awaiter;
      // 第一次进来,num 是 -1 所以接下来的逻辑
      if (num != 0)
      {
            // 这里执行 await 之前的代码
            Console.WriteLine("1" + Thread.CurrentThread.ManagedThreadId.ToString());
            // 这里就是执行await 方法
            awaiter = Program.GetThreadID2().GetAwaiter();
            // 判断是否执行完成,通常第一次进来 IsCompleted 都是 false
            if (!awaiter.IsCompleted)
            {
                // 修改状态,下次再执行 MoveNext 就不会继续走这段逻辑来了
                this.<>1__state = 0;
                this.<>u__1 = awaiter;
                Program.<Main>d__0 <Main>d__ = this;
                // 这里其实是往 Task 里添加了一个 任务结束时的回调,在任务结束时会再次调用 MoveNext
                // 这就解释了为什么 await 之后的方法是另外一个线程,因为await 之后的方法是在 下一个 MoveNext 的里调用的
                // 而那个 MoveNext 是由线程池挑选一个线程进行执行的            
                this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, Program.<Main>d__0>(ref awaiter, ref <Main>d__);
                return;
            }
      }
      else
      {
      awaiter = this.<>u__1;
            this.<>u__1 = default(TaskAwaiter);
            this.<>1__state = -1;
      }
      // GetResult 里有一个 死循环 等待结果
      awaiter.GetResult();
      // await 之后的方法
      Console.WriteLine("2" + Thread.CurrentThread.ManagedThreadId.ToString());
      Console.ReadLine();
    }
    catch (Exception exception)
    {
      this.<>1__state = -2;
      this.<>t__builder.SetException(exception);
      return;
    }
    this.<>1__state = -2;
    this.<>t__builder.SetResult();
}await 之前的代码

// 这里执行 await 之前的代码
Console.WriteLine("1" + Thread.CurrentThread.ManagedThreadId.ToString());await 之前的代码很简单,就是正常依次执行。
await 的代码

await 的代码大都以 Task 形式执行,主要分为两步。
第一步,将 Task 推入线程池中。
awaiter = Task.Run(new Action(Program.<>c.<>9.<GetThreadID2>b__3_0)).GetAwaiter();
public static Task Run(Action action)      
{            
    StackCrawlMark stackCrawlMark = StackCrawlMark.LookForMyCaller;            
    return Task.InternalStartNew(null, action, null, default(CancellationToken), TaskScheduler.Default, TaskCreationOptions.DenyChildAttach, InternalTaskOptions.None, ref stackCrawlMark);      
}
internal static Task InternalStartNew(Task creatingTask, Delegate action, object state, CancellationToken cancellationToken, TaskScheduler scheduler, TaskCreationOptions options, InternalTaskOptions internalOptions, ref StackCrawlMark stackMark)      
{               
    Task task = new Task(action, state, creatingTask, cancellationToken, options, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler);                     
    task.ScheduleAndStart(false);            
    return task;      
}
internal void ScheduleAndStart(bool needsProtection)
{
    try
    {
   this.m_taskScheduler.InternalQueueTask(this);
    }
    catch (ThreadAbortException exceptionObject){}
}
internal void InternalQueueTask(Task task)      
{                  
    this.QueueTask(task);      
}第二步,线程池执行 Task。
void IThreadPoolWorkItem.ExecuteWorkItem()      
{            
    this.ExecuteEntry(false);      
}
internal bool ExecuteEntry(bool bPreventDoubleExecution)
{

    if (!this.IsCancellationRequested && !this.IsCanceled)
    {
      this.ExecuteWithThreadLocal(ref Task.t_currentTask);
    }
    return true;
}
private void ExecuteWithThreadLocal(ref Task currentTaskSlot)
{
    Task task = currentTaskSlot;
    TplEtwProvider log = TplEtwProvider.Log;
    Guid currentThreadActivityId = default(Guid);
    bool flag = log.IsEnabled();
    try
    {
      currentTaskSlot = this;
      ExecutionContext capturedContext = this.CapturedContext;
      if (capturedContext == null)
      {
            this.Execute();
      }
      else
      {
            if (this.IsSelfReplicatingRoot || this.IsChildReplica)
            {
                this.CapturedContext = Task.CopyExecutionContext(capturedContext);
            }
            ContextCallback contextCallback = Task.s_ecCallback;
            if (contextCallback == null)
            {
                contextCallback = (Task.s_ecCallback = new ContextCallback(Task.ExecutionContextCallback));
            }
            // 这里是执行 Task 方法的地方
            ExecutionContext.Run(capturedContext, contextCallback, this, true);
      }
      this.Finish(true);
    }
    finally
    {
      currentTaskSlot = task;
    }
}await 之后的代码

那么 await 之后的代码是如何执行的呢?
大家回头看一下 MoveNext 方法,可以看到首次执行 MoveNext 的时候,代码的逻辑是这样的
if(num!=0) -> if(!awaiter.IsCompleted) -> AwaitUnsafeOnCompleted;return;关键就在于 AwaitUnsafeOnCompleted 这个方法,来看一下 AwaitUnsafeOnCompleted 方法。
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine
{
    try
    {
      AsyncMethodBuilderCore.MoveNextRunner runner = null;
      // 这里是准备一个 任务执行完的回调,简单理解成是将 MoveNext 包装成一个 action
      Action completionAction = this.m_coreState.GetCompletionAction(AsyncCausalityTracer.LoggingOn ? this.Task : null, ref runner);
      if (this.m_coreState.m_stateMachine == null)
      {
            Task<TResult> task = this.Task;
            this.m_coreState.PostBoxInitialization(stateMachine, runner, task);
      }
                // 将 completionAction 推到 task里,具体看一下接下来的代码
      awaiter.UnsafeOnCompleted(completionAction);
    }
    catch (Exception exception)
    {
      AsyncMethodBuilderCore.ThrowAsync(exception, null);
    }
}
public void UnsafeOnCompleted(Action continuation)      
{            
    TaskAwaiter.OnCompletedInternal(this.m_task, continuation, true, false);      
}
internal static void OnCompletedInternal(Task task, Action continuation, bool continueOnCapturedContext, bool flowExecutionContext)
{
    task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext, ref stackCrawlMark);
}
internal void SetContinuationForAwait(Action continuationAction, bool continueOnCapturedContext, bool flowExecutionContext, ref StackCrawlMark stackMark)
{
   if (!this.AddTaskContinuation(continuationAction, false))
   {
            AwaitTaskContinuation.UnsafeScheduleAction(continuationAction, this);
   }
}
private bool AddTaskContinuation(object tc, bool addBeforeOthers)
{
    //这里就把completionAction 放到了 Task 的 m_continuationObject 里
    return !this.IsCompleted && ((this.m_continuationObject == null && Interlocked.CompareExchange(ref this.m_continuationObject, tc, null) == null) || this.AddTaskContinuationComplex(tc, addBeforeOthers));
}至此为止,准备工作(将 MoveNext 放到 task 的回调里)已经做完了,接下来就是如何来触发这个方法。
简单分析一下,MoveNext 肯定是在 task 执行完触发,让我们回头看看 Task 执行的代码。会发现,哎嘿,在ExecutionContext.Run(capturedContext, contextCallback, this, true); 方法执行完后还有一句代码 this.Finish(true)。
internal void Finish(bool bUserDelegateExecuted)
{
    this.FinishStageTwo();
}
internal void FinishStageTwo()
{
    this.FinishStageThree();
}
internal void FinishStageThree()
{
    this.FinishContinuations();
}
internal void FinishContinuations()
{
    // 获取之前注册的所有回调最简单的就可以理解成 MoveNext
    object obj = Interlocked.Exchange(ref this.m_continuationObject, Task.s_taskCompletionSentinel);
    if (obj != null)
    {
      bool flag = (this.m_stateFlags & 134217728) == 0 && Thread.CurrentThread.ThreadState != ThreadState.AbortRequested && (this.m_stateFlags & 64) == 0;
      Action action = obj as Action;
      if (action != null)
      {
            AwaitTaskContinuation.RunOrScheduleAction(action, flag, ref Task.t_currentTask);
            return;
      }
    }
}
internal static void RunOrScheduleAction(Action action, bool allowInlining, ref Task currentTask)
{
    // 这里调用了MoveNextRunner.Run ,简单理解 就是MoveNext 方法
    action();
}
void IAsyncStateMachine.MoveNext()
{   
    // 此时 num 就是 0 了,就会执行 await 之后的代码了。
    int num = this.<>1__state;
    try
    {
      TaskAwaiter awaiter;
      if (num != 0)
      {}
      else
      {
      awaiter = this.<>u__1;
            this.<>u__1 = default(TaskAwaiter);
            this.<>1__state = -1;
      }
      awaiter.GetResult();
      Console.WriteLine("2" + Thread.CurrentThread.ManagedThreadId.ToString());
      Console.ReadLine();
    }
    catch (Exception exception)
    {
    }
    this.<>1__state = -2;
    this.<>t__builder.SetResult();
}这一段代码我删掉了很多内容,但大致的执行顺序就是如此,如果想要了解更多细节,可以查看 Task 这个类的源码。
总结

回头看看开头的两个问题,现在就可以回答了。

[*]await 之后的方法是何时执行,如何执行的?
await 的方法在 Task 执行完成之后,通过调用 Finish 方法执行的。
具体的执行步骤是先将 MoveNext 方法注册到 Task 的回调里,然后在 Task 执行完后调用这个方法。
[*]为什么 await 之后的代码会在不同的线程执行?
这个其实是因为 Task 的机制,Task 会被推到线程池里,由线程池挑选一个线程去执行,await 之后的代码其实是由这个线程去执行的,自然就跟 await 的之前的代码不是一个线程。
PS: 补一张图


来源:https://www.cnblogs.com/ccwzl/archive/2023/08/16/17636189.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: async_await 源码分析