Async/Await在 C#语言中是如何工作的

您所在的位置:网站首页 listen的结构 Async/Await在 C#语言中是如何工作的

Async/Await在 C#语言中是如何工作的

2023-04-22 12:53| 来源: 网络整理| 查看: 265

Async/Await在 C#语言中是如何工作的

Songjie Cai

April 18th, 20230 0

本文翻译于Stephen Toub的这篇英文文章:How Async/Await Really Works in C# – .NET Blog (microsoft.com)

几周前,.NET Blog刊登了一篇文章 什么是 .NET,为什么要选择它?。它提供了对平台的高层次概述,总结了各种组件和设计决策,并承诺对所涉及的领域发表更深入的文章。这是第一篇这样的后续文章,深入探讨了C#和.NET中async/await的历史、背后的设计决策和实现细节。

对async/await的支持已经存在了十年之久。在这段时间里,它改变了为.NET编写可扩展代码的方式,而在不了解其底层逻辑的情况下使用该功能是可行的,也是非常常见的。你可以从一个像下面这样的同步方法开始(这个方法是“同步的”,因为调用者将无法做其他任何事情,直到整个操作完成并将控制权返回给调用者):

// Synchronously copy all data from source to destination. public void CopyStreamToStream(Stream source, Stream destination) { var buffer = new byte[0x1000]; int numRead; while ((numRead = source.Read(buffer, 0, buffer.Length)) != 0) { destination.Write(buffer, 0, numRead); } }

然后添加几个关键字,更改几个方法名称,最终得到以下异步方法(这个方法是 “异步 “的,因为控制权会很快返回给它的调用者,而且可能在与整个操作相关的工作完成之前):

// Asynchronously copy all data from source to destination. public async Task CopyStreamToStreamAsync(Stream source, Stream destination) { var buffer = new byte[0x1000]; int numRead; while ((numRead = await source.ReadAsync(buffer, 0, buffer.Length)) != 0) { await destination.WriteAsync(buffer, 0, numRead); } }

语法上几乎相同,仍然能够利用所有相同的控制流结构,但现在是非阻塞性的,具有明显不同的底层执行模型,而且所有繁重的工作都由C#编译器和核心库为你完成。

 

虽然在不知道底层逻辑的情况下使用这种支持是很常见的,但我坚信,了解某些东西的实际工作原理有助于你更好地使用它。特别是对于async/await来说,当你想看清表面之下的东西时,了解其中的机制特别有帮助,比如当你试图调试出错的东西或提高正常运行的代码的性能时。那么,在这篇文章中,我们将深入探讨await在语言、编译器和库级别的工作原理,以便你可以充分利用这些有价值的功能。

不过,要做到这一点,我们需要追溯到async/await之前,以了解在没有它的情况下最先进的异步代码是什么样子的。友情提示,它并不漂亮。

一开始…

早在.NET Framework 1.0中,就有异步编程模型模式,又称APM模式、Begin/End模式、IAsyncResult模式。在高层次上,该模式很简单。对于同步操作DoStuff:

class Handler { public int DoStuff(string arg); }

作为模式的一部分,将有两个相应的方法:BeginDoStuff 方法和 EndDoStuff 方法:

class Handler { public int DoStuff(string arg); public IAsyncResult BeginDoStuff(string arg, AsyncCallback? callback, object? state); public int EndDoStuff(IAsyncResult asyncResult); }

BeginDoStuff会像DoStuff一样接受所有相同的参数,但除此之外,它还会接受AsyncCallback委托和一个不透明的状态对象,其中一个或两个都可以为null。Begin方法负责初始化异步操作,如果提供了回调(通常称为初始操作的“延续”),它还负责确保在异步操作完成时调用回调。Begin方法还将构造一个实现了IAsyncResult的类型实例,使用可选状态填充IAsyncResult的AsyncState属性:

namespace System { public interface IAsyncResult { object? AsyncState { get; } WaitHandle AsyncWaitHandle { get; } bool IsCompleted { get; } bool CompletedSynchronously { get; } } public delegate void AsyncCallback(IAsyncResult ar); }

然后,这个IAsyncResult实例将从Begin方法返回,并在最终调用AsyncCallback时传递给它。当准备使用操作的结果时,调用者将把IAsyncResult实例传递给End方法,该方法负责确保操作已完成(如果没有完成,则通过阻塞同步等待操作完成),然后返回操作的任何结果,包括传播可能发生的任何错误/异常。因此,不用像下面这样写代码来同步执行操作:

try { int i = handler.DoStuff(arg); Use(i); } catch (Exception e) { ... // handle exceptions from DoStuff and Use }

可以按以下方式使用 Begin/End 方法异步执行相同的操作:

try { handler.BeginDoStuff(arg, iar => { try { Handler handler = (Handler)iar.AsyncState!; int i = handler.EndDoStuff(iar); Use(i); } catch (Exception e2) { ... // handle exceptions from EndDoStuff and Use } }, handler); } catch (Exception e) { ... // handle exceptions thrown from the synchronous call to BeginDoStuff }

对于在任何语言中处理过基于回调的API的人来说,这应该感觉很熟悉。

然而,事情从此变得更加复杂。例如,有一个 “stack dives”的问题。stack dives是指代码反复调用,在堆栈中越陷越深,以至于可能出现堆栈溢出。如果操作同步完成,Begin方法被允许同步调用回调,这意味着对Begin的调用本身可能直接调用回调。同步完成的 “异步 “操作实际上是很常见的;它们不是 “异步”,因为它们被保证异步完成,而只是被允许这样做。例如,考虑从一些网络化的操作中进行异步读取,比如从套接字中接收。如果你对每个单独的操作只需要少量的数据,比如从一个响应中读取一些标头数据,则可以在适当的位置设置缓冲区,以避免大量系统调用的开销。不是立即对需要的数据量进行小的读取,而是对缓冲区执行更大的读取,然后从缓冲区中使用数据,直到缓冲区耗尽;不是立即对需要的数据量进行小的读取,而是对缓冲区执行更大的读取,然后从缓冲区中使用数据,直到缓冲区耗尽;

这是一种真实的可能性,很容易再现。在.NET Core上试试这个程序:

using System.Net; using System.Net.Sockets; using Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); listener.Bind(new IPEndPoint(IPAddress.Loopback, 0)); listener.Listen(); using Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); client.Connect(listener.LocalEndPoint!); using Socket server = listener.Accept(); _ = server.SendAsync(new byte[100_000]); var mres = new ManualResetEventSlim(); byte[] buffer = new byte[1]; var stream = new NetworkStream(client); void ReadAgain() { stream.BeginRead(buffer, 0, 1, iar => { if (stream.EndRead(iar) != 0) { ReadAgain(); // uh oh! } else { mres.Set(); } }, null); }; ReadAgain(); mres.Wait();

在这里,我设置了一个相互连接的简单客户端套接字和服务器套接字。服务器向客户端发送100,000字节,然后客户端继续使用BeginRead/EndRead来“异步”地每次读取一个字节(这是非常低效的,而且只是以教学的名义这么做)。传给BeginRead的回调函数通过调用EndRead来完成读取,然后如果它成功读取了所需的字节(这种情况下还没有到达流的结尾),它会通过递归调用ReadAgain局部函数来发出另一个BeginRead。然而,在 .net Core中,套接字操作比在 .net Framework上快得多,并且如果操作系统能够满足同步操作,它将同步完成(注意内核本身有一个缓冲区用于满足套接字接收操作)。因此,这个堆栈会溢出:Image BeginReadStackOverflow 1

因此,APM模型中内置了补偿机制。有两种可能的方法可以弥补这一点:

不要允许AsyncCallback被同步调用。如果一直异步调用它,即使操作以同步方式完成,那么stack dives的风险也会消失。但是性能也是如此,因为同步完成的操作(或者快到无法观察到它们的区别)是非常常见的,强迫每个操作排队回调会增加可测量的开销。 使用一种机制,允许调用方而不是回调方在操作同步完成时执行延续工作。这样,您就可以避开额外的方法框架,继续执行后续工作,而不深入堆栈。

APM模式与option(2)一起使用。为此,IAsyncResult接口公开了两个相关但不同的成员:IsCompleted和CompletedSynchronously。IsCompleted告诉你操作是否已经完成:你可以多次检查它,最终它会从false转换为true,然后保持不变。相比之下,CompletedSynchronously永远不会改变(如果改变了,那就是一个令人讨厌的bug);它用于Begin方法的调用者和AsyncCallback之间的通信,他们中的一个负责执行任何延续工作。如果CompletedSynchronously为false,则操作是异步完成的,响应操作完成的任何后续工作都应该留给回调;毕竟,如果工作没有同步完成,Begin的调用方无法真正处理它,因为还不知道操作已经完成(如果调用方只是调用End,它将阻塞直到操作完成)。然而,如果CompletedSynchronously为真,如果回调要处理延续工作,那么它就有stack dives的风险,因为它将在堆栈上执行比开始时更深的延续工作。因此,任何涉及到这种堆栈潜水的实现都需要检查CompletedSynchronously,并让Begin方法的调用者执行延续工作(如果它为真),这意味着回调不需要执行延续工作。这也是CompletedSynchronously永远不能更改的原因:调用方和回调方需要看到相同的值,以确保不管竞争条件如何,延续工作只执行一次。

在我们之前的DoStuff示例中,就会导致代码像这样:

try { IAsyncResult ar = handler.BeginDoStuff(arg, iar => { if (!iar.CompletedSynchronously) { try { Handler handler = (Handler)iar.AsyncState!; int i = handler.EndDoStuff(iar); Use(i); } catch (Exception e2) { ... // handle exceptions from EndDoStuff and Use } } }, handler); if (ar.CompletedSynchronously) { int i = handler.EndDoStuff(ar); Use(i); } } catch (Exception e) { ... // handle exceptions that emerge synchronously from BeginDoStuff and possibly EndDoStuff/Use

太拗口了。到目前为止,我们只研究了如何使用该模式……还没有研究如何实现该模式。虽然大多数开发人员不需要关心叶子操作(例如,实现与操作系统交互的实际Socket.BeginReceive/EndReceive方法),但很多很多开发人员需要关心这些操作的组合(执行多个异步操作,形成一个更大的操作),这意味着不仅要使用其他Begin/End方法,还需要自己去实现它们,以便你的组合本身可以在其他地方使用。你会注意到,在我之前的DoStuff示例中没有控制流。如果引入多种操作,尤其是像循环这样简单的控制流,突然之间,这就像进入到了那些享受痛苦的专家或博客作者的领域。

为了深入理解这一点,我们来实现一个完整的例子。在这篇文章的开始,我展示了一个CopyStreamToStream方法,它将所有数据从一个流复制到另一个流(以Stream.CopyTo的方式,但是为了解释,假设它不存在):

public void CopyStreamToStream(Stream source, Stream destination) { var buffer = new byte[0x1000]; int numRead; while ((numRead = source.Read(buffer, 0, buffer.Length)) != 0) { destination.Write(buffer, 0, numRead); } }

简单来说:我们反复的从一个流中读取数据,然后将产生的数据写到另一个流中,从一个流中读取并写到另一个流中,如此反复,直到我们没有更多的数据可以读取。现在,我们如何使用APM模式异步实现这个功能呢?像这样:

public IAsyncResult BeginCopyStreamToStream( Stream source, Stream destination, AsyncCallback callback, object state) { var ar = new MyAsyncResult(state); var buffer = new byte[0x1000]; Action readWriteLoop = null!; readWriteLoop = iar => { try { for (bool isRead = iar == null; ; isRead = !isRead) { if (isRead) { iar = source.BeginRead(buffer, 0, buffer.Length, static readResult => { if (!readResult.CompletedSynchronously) { ((Action)readResult.AsyncState!)(readResult); } }, readWriteLoop); if (!iar.CompletedSynchronously) { return; } } else { int numRead = source.EndRead(iar!); if (numRead == 0) { ar.Complete(null); callback?.Invoke(ar); return; } iar = destination.BeginWrite(buffer, 0, numRead, writeResult => { if (!writeResult.CompletedSynchronously) { try { destination.EndWrite(writeResult); readWriteLoop(null); } catch (Exception e2) { ar.Complete(e); callback?.Invoke(ar); } } }, null); if (!iar.CompletedSynchronously) { return; } destination.EndWrite(iar); } } } catch (Exception e) { ar.Complete(e); callback?.Invoke(ar); } }; readWriteLoop(null); return ar; } public void EndCopyStreamToStream(IAsyncResult asyncResult) { if (asyncResult is not MyAsyncResult ar) { throw new ArgumentException(null, nameof(asyncResult)); } ar.Wait(); } private sealed class MyAsyncResult : IAsyncResult { private bool _completed; private int _completedSynchronously; private ManualResetEvent? _event; private Exception? _error; public MyAsyncResult(object? state) => AsyncState = state; public object? AsyncState { get; } public void Complete(Exception? error) { lock (this) { _completed = true; _error = error; _event?.Set(); } } public void Wait() { WaitHandle? h = null; lock (this) { if (_completed) { if (_error is not null) { throw _error; } return; } h = _event ??= new ManualResetEvent(false); } h.WaitOne(); if (_error is not null) { throw _error; } } public WaitHandle AsyncWaitHandle { get { lock (this) { return _event ??= new ManualResetEvent(_completed); } } } public bool CompletedSynchronously { get { lock (this) { if (_completedSynchronously == 0) { _completedSynchronously = _completed ? 1 : -1; } return _completedSynchronously == 1; } } } public bool IsCompleted { get { lock (this) { return _completed; } } } }

出人意料。而且,即使有那么多官样文章,它仍然不是一个很好的实现。例如,IAsyncResult实现是锁定每个操作,而不是在可能的情况下以更无锁的方式进行操作,异常被原始存储,而不是作为ExceptionDispatchInfo,可以在传播时增加其调用堆栈,每个单独的操作涉及大量的分配(例如,为每个BeginWrite调用分配一个委托),等等。现在,想象一下,你必须为你想要编写的每个方法执行所有这些操作。每次你想编写一个可复用的方法来处理另一个异步操作时,都需要完成所有这些工作。如果你想编写可复用的组合器,并且能够高效地对多个离散的IAsyncResults进行操作(想想Task.WhenAll),这又是另一个难度级别;每个操作都实现并公开了它自己的特定于该操作的 API,这意味着没有一种通用语言可以类似地讨论它们(尽管一些开发人员编写了库,试图减轻负担,通常通过另一层回调,使API能够为Begin方法提供适当的AsyncCallback)。

所有这些复杂情况意味着很少有人会尝试这样做,而对于那些尝试的人来说,bug很猖獗。公平地说,这并不是对APM模式的批评。相反,它一般是对基于回调的异步的批评。我们都习惯了现代语言中的控制流结构为我们提供的强大和简单性,一旦引入了任何合理的复杂性,而基于回调的方法通常会与这种结构相冲突。其他主流语言也没有更好的替代方案。

我们需要一种更好的方法,一种从APM模式中学习的方法,融合它正确的东西,同时避免它的陷阱。值得注意的是,APM模式只是一种模式; 运行时间、核心库和编译器在使用或实现该模式时并没有提供任何帮助。

基于事件的异步模式

. net Framework 2.0引入了一些api,实现了处理异步操作的不同模式,这种模式主要用于在客户端应用程序上下文中处理异步操作。这种基于事件的异步模式或 EAP 也作为一对成员出现(至少,可能更多),这次是一个用于初始化异步操作的方法和一个用于侦听其完成的事件。因此,我们之前的DoStuff示例可能被公开为一组成员,如下所示:

class Handler { public int DoStuff(string arg); public void DoStuffAsync(string arg, object? userToken); public event DoStuffEventHandler? DoStuffCompleted; } public delegate void DoStuffEventHandler(object sender, DoStuffEventArgs e); public class DoStuffEventArgs : AsyncCompletedEventArgs { public DoStuffEventArgs(int result, Exception? error, bool canceled, object? userToken) : base(error, canceled, usertoken) => Result = result; public int Result { get; } }

你需要用DoStuffCompleted事件注册你的继续工作,然后调用DoStuffAsync方法;它将启动该操作,并且在该操作完成时,调用者将异步地引发DoStuffCompleted事件。然后,处理程序可以继续执行后续工作,可能会验证所提供的userToken与它所期望的进行匹配,从而允许多个处理程序同时连接到事件。

这种模式使一些用例变得更简单,同时使其他用例变得更加困难(考虑到前面的APM CopyStreamToStream示例,这说明了一些问题)。它没有以广泛的方式推出,只是在一个单独的 .NET Framework版本中匆匆的出现又消失了,尽管留下了它使用期间添加的api,如Ping.SendAsync/Ping.PingCompleted:

public class Ping : Component { public void SendAsync(string hostNameOrAddress, object? userToken); public event PingCompletedEventHandler? PingCompleted; ... }

然而,它确实取得了一个APM模式完全没有考虑到的显著进步,并且这一点一直延续到我们今天所接受的模型中: SynchronizationContext。

在. net Framework 2.0中还引入了SynchronizationContext,作为通用调度器的抽象概念。特别是,SynchronizationContext最常用的方法是Post,它将一个工作项排到该上下文所代表的任何调度器中。例如,SynchronizationContext的基本实现只代表ThreadPool,因此 base implementation of SynchronizationContext.Post只是委托给ThreadPool.QueueUserWorkItem,用于要求线程池调用所提供的回调,并在池中的一个线程上使用关联状态。然而SynchronizationContext的基本功能不仅仅是支持任意的调度器,而是支持以根据各种应用程序模型的需要工作的方式进行调度。

考虑到像Windows Forms这样的UI框架。与Windows上的大多数UI框架一样,控件与特定的线程相关联,该线程运行一个消息泵,该消息泵运行能够与这些控件交互的工作: 只有该线程应该尝试操作这些控件,而任何其他想要与控件交互的线程都应该通过发送消息由UI线程的泵消耗来完成操作。Windows窗体使用ControlBeginInvoke等方法使这变得很容易,它将提供的委托和参数排队,由与该控件相关联的任何线程运行。因此,你可以这样编写代码:

private void button1_Click(object sender, EventArgs e) { ThreadPool.QueueUserWorkItem(_ => { string message = ComputeMessage(); button1.BeginInvoke(() => { button1.Text = message; }); }); }

这将卸载在ThreadPool线程上完成的ComputeMessage()工作(以便在处理UI的过程中保持UI的响应性),然后在工作完成时,将委托队列返回到与button1相关的线程,以更新button1的标签。这很简单,WPF也有类似的东西,只是用它的Dispatcher类型:

private void button1_Click(object sender, RoutedEventArgs e) { ThreadPool.QueueUserWorkItem(_ => { string message = ComputeMessage(); button1.Dispatcher.InvokeAsync(() => { button1.Content = message; }); }); }

. net MAUI也有类似的功能。但如果我想把这个逻辑放到辅助方法中呢?

// Call ComputeMessage and then invoke the update action to update controls. internal static void ComputeMessageAndInvokeUpdate(Action update) { ... }

然后我可以这样使用它:

private void button1_Click(object sender, EventArgs e) { ComputeMessageAndInvokeUpdate(message => button1.Text = message); }

但是如何实现ComputeMessageAndInvokeUpdate,使其能够在这些应用程序中工作呢?是否需要硬编码才能了解每个可能的UI框架?这就是SynchronizationContext的魅力所在。我们可以这样实现这个方法:

internal static void ComputeMessageAndInvokeUpdate(Action update) { SynchronizationContext? sc = SynchronizationContext.Current; ThreadPool.QueueUserWorkItem(_ => { string message = ComputeMessage(); if (sc is not null) { sc.Post(_ => update(message), null); } else { update(message); } }); }

它使用SynchronizationContext作为一个抽象,目标是任何“调度器”,应该用于回到与UI交互的必要环境。然后,每个应用程序模型确保它作为SynchronizationContext.Current发布一个SynchronizationContext-derived 类型,去做 “正确的事情”。例如,Windows Forms有这个:

public sealed class WindowsFormsSynchronizationContext : SynchronizationContext, IDisposable { public override void Post(SendOrPostCallback d, object? state) => _controlToSendTo?.BeginInvoke(d, new object?[] { state }); ... }

WPF有这个:

public sealed class DispatcherSynchronizationContext : SynchronizationContext { public override void Post(SendOrPostCallback d, Object state) => _dispatcher.BeginInvoke(_priority, d, state); ... }

ASP.NET曾经有一个,它实际上并不关心工作在什么线程上运行,而是关心给定的请求相关的工作被序列化,这样多个线程就不会并发地访问给定的HttpContext:

internal sealed class AspNetSynchronizationContext : AspNetSynchronizationContextBase { public override void Post(SendOrPostCallback callback, Object state) => _state.Helper.QueueAsynchronous(() => callback(state)); ... }

这也不限于这些主要的应用程序模型。例如,xunit是一个流行的单元测试框架,是.NET核心存储库用于单元测试的框架,它也采用了多个自定义的SynchronizationContext。例如,你可以允许并行运行测试,但限制允许并发运行的测试数量。这是如何实现的呢?通过SynchronizationContext:

public class MaxConcurrencySyncContext : SynchronizationContext, IDisposable { public override void Post(SendOrPostCallback d, object? state) { var context = ExecutionContext.Capture(); workQueue.Enqueue((d, state, context)); workReady.Set(); } }

MaxConcurrencySyncContext的Post方法只是将工作排到自己的内部工作队列中,然后在它自己的工作线程上处理它,它根据所需的最大并发数来控制有多少工作线程。你会明白的。

这与基于事件的异步模式有什么联系?EAP和SynchronizationContext是同时引入的,当异步操作被启动时,EAP规定完成事件应该排队到当前任何SynchronizationContext中。为了稍微简化一下(可能不足以保证额外的复杂性),System.ComponentModel中也引入了一些辅助类型,尤其是AsyncOperation和AsyncOperationManager。前者只是一个元组,封装了用户提供的状态对象和捕获的SynchronizationContext,而后者只是作为一个简单的工厂来捕获并创建AsyncOperation实例。然后EAP实现将使用这些,例如Ping.SendAsync调用AsyncOperationManager.CreateOperation来捕获SynchronizationContext。当操作完成时,AsyncOperation的PostOperationCompleted方法将被调用,以调用存储的SynchronizationContext的Post方法。

 

SynchronizationContext还提供了一些值得一提的小功能,因为它们稍后会再次出现。特别是,它公开了OperationStarted和OperationCompleted方法。这些虚函数的基本实现是空的,什么都不做,但派生实现可能会覆盖它们,以了解运行中的操作。这意味着EAP实现也会在每个操作的开始和结束时调用OperationStarted/OperationCompleted,以便通知任何存在的SynchronizationContext,并允许它跟踪工作。这与EAP模式特别相关,因为发起异步操作的方法是void返回的:您没有得到任何返回,因此允许你单独跟踪工作。我们会回到这个问题上。

 

因此,我们需要比APM模式更好的东西,接下来出现的EAP引入了一些新的事务,但并没有真正解决我们面临的核心问题。我们仍然需要更好的东西。

输入任务

. net Framework 4.0引入了System.Threading.Tasks.Task类型。从本质上讲,Task只是一个数据结构,表示某些异步操作的最终完成(其他框架将类似的类型称为“promise”或“future”)。创建Task是为了表示某些操作,然后当它表示的操作逻辑上完成时,结果存储到该Task中。很简单。但是Task提供的关键特性使它比IAsyncResult更有用,它在自己内部内置了continuation的概念。这一特性意味着您可以访问任何Task,并在其完成时请求异步通知,由任务本身处理同步,以确保继续被调用,无论任务是否已经完成、尚未完成、还是与通知请求同时完成。为什么会有如此大的影响?如果你还记得我们对旧APM模式的讨论,有两个主要问题。

你必须为每个操作实现一个自定义的IAsyncResult实现:没有内置的IAsyncResult实现,任何人都可以根据需要使用。 在Begin方法被调用之前,你必须知道当它完成时要做什么。这使得实现组合器和其他用于消耗和组合任意异步实现的通用例程成为一个重大挑战。

相比之下,在Task中,这种共享表示方式让你在启动操作后继续执行异步操作,并在启动操作后提供continuation……你不需要为启动操作的方法提供continuation。每个进行异步操作的人都可以生成一个Task,每个使用异步操作的人都可以使用一个Task,并且不需要任何自定义操作将两者连接起来:Task成为使异步操作的生产者和消费者交谈的通用语言。这改变了.net的面貌。稍后会详细介绍……

 

现在,让我们更好地理解它的实际含义。我们将做一些教学上的事情,并实现一个简单的版本,而不是深入到Task复杂的代码中。这并不是一个很好的实现,只是在功能上足够完整,足以帮助理解什么是Task,最终,它实际上只是一个处理协调设置和接收完成信号的数据结构。我们先从几个字段开始:

class MyTask { private bool _completed; private Exception? _error; private Action? _continuation; private ExecutionContext? _ec; ... }

我们需要一个字段来知道任务是否完成(_completed),还需要一个字段来存储导致任务失败的任何错误(_error);如果我们还要实现一个通用的MyTask,那么也会有一个私有的TResult _result字段,用于存储操作的成功结果。到目前为止,这看起来很像我们之前自定义的IAsyncResult实现(当然,这不是巧合)。但是现在最重要的部分,是 _continuation字段。在这个简单的实现中,我们只支持一个continuation,但对于解释目的来说这已经足够了(真正的任务使用了一个对象字段,该字段可以是单个continuation对象,也可以是continuation对象的List)。这是一个委托,将在任务完成时调用。

现在,有一些表面的东西。如前所述,与以前的模型相比,Task的一个基本进步是能够在操作开始后提供延续工作(回调)。我们需要一个方法来做到这一点,所以让我们添加ContinueWith:

public void ContinueWith(Action action) { lock (this) { if (_completed) { ThreadPool.QueueUserWorkItem(_ => action(this)); } else if (_continuation is not null) { throw new InvalidOperationException("Unlike Task, this implementation only supports a single continuation."); } else { _continuation = action; _ec = ExecutionContext.Capture(); } } }

如果任务在ContinueWith被调用时已经被标记为完成,ContinueWith只是排队执行委托。否则,该方法将存储该委托,以便在任务完成时可以排队继续执行(它还存储了一个叫做ExecutionContext的东西,然后在以后调用该委托时使用它,但现在不要担心这部分……我们会讨论它)。很简单。

 

然后,我们需要能够将MyTask标记为完成,这意味着它所代表的异步操作已经完成。为此,我们将提供两个方法,一个用于标记完成(” SetResult “),另一个用于标记完成并返回错误(” SetException “):

public void SetResult() => Complete(null); public void SetException(Exception error) => Complete(error); private void Complete(Exception? error) { lock (this) { if (_completed) { throw new InvalidOperationException("Already completed"); } _error = error; _completed = true; if (_continuation is not null) { ThreadPool.QueueUserWorkItem(_ => { if (_ec is not null) { ExecutionContext.Run(_ec, _ => _continuation(this), null); } else { _continuation(this); } }); } } }

我们存储任何错误,将任务标记为已完成,然后如果之前已经注册了continuation,则将其排队等待调用。

 

最后,我们需要一种方法来传播任务中可能发生的任何异常(并且,如果这是一个泛型MyTask,则返回其_result);为了方便某些情况,我们还允许此方法阻塞等待任务完成,这可以通过ContinueWith实现(continuation 只是发出ManualResetEventSlim信号,然后调用者阻塞等待完成)。

public void Wait() { ManualResetEventSlim? mres = null; lock (this) { if (!_completed) { mres = new ManualResetEventSlim(); ContinueWith(_ => mres.Set()); } } mres?.Wait(); if (_error is not null) { ExceptionDispatchInfo.Throw(_error); } }

基本上就是这样。现在可以肯定的是,真正的Task要复杂得多,有更高效的实现,支持任意数量的continuation,有大量关于它应该如何表现的按钮(例如,continuation应该像这里所做的那样排队,还是应该作为任务完成的一部分同步调用),能够存储多个异常而不是一个异常,具有取消的特殊知识,有大量的辅助方法用于执行常见操作(例如Task.Run,它创建一个Task来表示线程池上调用的委托队列),等等。但这些都没有什么神奇之处;其核心就是我们在这里看到的东西。

你可能还注意到,我简单的MyTask直接有公共的SetResult/SetException方法,而Task没有。实际上,Task确实有这样的方法,它们只是内部的,System.Threading.Tasks.TaskCompletionSource类型作为任务及其完成的独立“生产者”;这样做不是出于技术上的需要,而是为了让完成方法远离只用于消费的东西。然后,你就可以把Task分发出去,而不必担心它会在你下面完成;完成信号是创建任务的实现细节,并且通过保留TaskCompletionSource本身来保留完成它的权利。(CancellationToken和CancellationTokenSource遵循类似的模式:CancellationToken只是CancellationTokenSource的一个结构封装器,只提供与消费取消信号相关的公共区域,但没有产生取消信号的能力,而产生取消信号的能力仅限于能够访问CancellationTokenSource的人。)

当然,我们可以为这个MyTask实现组合器和辅助器,就像Task提供的那样。想要一个简单的MyTask.WhenAll?给你:

public static MyTask WhenAll(MyTask t1, MyTask t2) { var t = new MyTask(); int remaining = 2; Exception? e = null; Action continuation = completed => { e ??= completed._error; // just store a single exception for simplicity if (Interlocked.Decrement(ref remaining) == 0) { if (e is not null) t.SetException(e); else t.SetResult(); } }; t1.ContinueWith(continuation); t2.ContinueWith(continuation); return t; }

想要一个MyTask.Run?你得到了它:

public static MyTask Run(Action action) { var t = new MyTask(); ThreadPool.QueueUserWorkItem(_ => { try { action(); t.SetResult(); } catch (Exception e) { t.SetException(e); } }); return t; }

一个MyTask.Delay怎么样?当然可以:

public static MyTask Delay(TimeSpan delay) { var t = new MyTask(); var timer = new Timer(_ => t.SetResult()); timer.Change(delay, Timeout.InfiniteTimeSpan); return t; }

你懂的。

有了Task, . net中之前的所有异步模式都将成为过去。在以前使用APM模式或EAP模式实现异步实现的地方,都会公开新的Task返回方法。

 

ValueTasks

时至今日,Task仍然是. net中异步的主动力,每个版本都公开了新的方法,并且在整个生态系统中定期返回Task和Task。然而,Task是一个类,这意味着创建一个类需要分配内存。在大多数情况下,为一个长期异步操作额外分配内存是微不足道的,除了对性能最敏感的操作之外,不会对产生有意义的影响。不过,如前所述,异步操作的同步完成是相当常见的。

时至今日,Task仍然是. net中异步处理的主力,每次发布都有新方法公开,并且在整个生态系统中都例行地返回Task和Task。然而,Task是一个类,这意味着创建一个类需要分配内存。在大多数情况下,为一个长期异步操作额外分配内存是微不足道的,除了对性能最敏感的操作之外,不会对所有操作的性能产生有意义的影响。不过,如前所述,异步操作的同步完成是相当常见的。引入Stream.ReadAsync是为了返回一个 Task,但如果你从一个BufferedStream中读取数据,很有可能很多读取都是同步完成的,因为只需要从内存中的缓冲区中读取数据,而不是执行系统调用和真正的I/O操作。不得不分配一个额外的对象来返回这样的数据是不幸的(注意,APM也是这样的情况)。对于返回Task的非泛型方法,该方法可以只返回一个已经完成的单例任务,而实际上Task.CompletedTask提供了一个这样的单例Task。但对于Task来说,不可能为每个可能的结果缓存一个Task。我们可以做些什么来让这种同步完成更快呢?

 

缓存一些Task是可能的。例如,Task非常常见,而且只有两个有意义的东西需要缓存:当结果为true时,一个Task,当结果为false时,一个Task。或者,虽然我们不想缓存40亿个Task来容纳所有可能的Int32结果,但小的Int32值是非常常见的,因此我们可以缓存一些值,比如-1到8。或者对于任意类型,default是一个合理的通用值,因此我们可以缓存每个相关类型的Task,其中Result为default(TResult)。事实上,Task.FromResult今天也是这样做的 (从最近的.NET版本开始),使用一个小型的可复用的Task单例缓存,并在适当时返回其中一个,或者为准确提供的结果值分配一个新的Task。可以创建其他方案来处理其他合理的常见情况。例如,当使用Stream.ReadAsync时,在同一个流上多次调用它是合理的,而且每次调用时允许读取的字节数都是相同的。实现能够完全满足count请求是合理的。这意味着Stream.ReadAsync重复返回相同的int值是很常见的。为了避免这种情况下的多次分配,多个Stream类型(如MemoryStream)会缓存它们最后成功返回的Task,如果下一次读取也同步完成并成功获得相同的结果,它可以只是再次返回相同的Task,而不是创建一个新的。但其他情况呢?在性能开销非常重要的情况下,如何更普遍地避免对同步完成的这种分配?

 

这就是ValueTask的作用(还有关于ValueTask 的更详细的研究也可以找到)。ValueTask最初是作为TResult和Task之间的一个区分并集。说到底,抛开那些花哨的东西,这就是它的全部 (或者,更确切地说,曾经是),是一个即时的结果,或者是对未来某个时刻的一个结果的承诺:

public readonly struct ValueTask { private readonly Task? _task; private readonly TResult _result; ... }

然后,一个方法可以返回这样一个ValueTask,而不是一个Task,如果TResult在需要返回的时候已经知道了,那么就可以避免Task的分配,代价是一个更大的返回类型和稍微多一点间接性。

然而,在一些超级超级极端的高性能场景中,即使在异步完成的情况下,您也希望能够避免Task分配。例如,Socket位于网络堆栈的底部,Socket上的SendAsync和ReceiveAsync对于许多服务来说是非常热门的路径,同步和异步完成都非常常见(大多数同步发送完成,许多同步接收完成,因为数据已经在内核中缓冲了)。如果在一个给定的Socket上,我们可以使这样的发送和接收不受分配限制,而不管操作是同步完成还是异步完成,这不是很好吗?

 

这就是System.Threading.Tasks.Sources.IValueTaskSource进入的地方:

public interface IValueTaskSource { ValueTaskSourceStatus GetStatus(short token); void OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags); TResult GetResult(short token); }

IValueTaskSource接口允许一个实现为ValueTask提供自己的支持对象,使该对象能够实现像GetResult这样的方法来检索操作的结果,以及OnCompleted来连接操作的延续。就这样,ValueTask对其定义进行了一个小小的更改,其Task? _task字段替换为object? _obj字段:

public readonly struct ValueTask { private readonly object? _obj; private readonly TResult _result; ... }

以前_task字段要么是Task要么是null,现在_obj字段也可以是IValueTaskSource。一旦Task被标记为已完成,它将保持完成状态,并且永远不会转换回未完成的状态。相比之下,实现 IValueTaskSource 的对象对实现有完全的控制权,可以自由地在完成状态和不完成状态之间双向转换,因为 ValueTask 的契约是一个给定的实例只能被消耗一次,因此从结构上看,它不应该观察到底层实例的消耗后变化(这就是 CA2012等分析规则存在的原因)。这就使得像Socket这样的类型能够将IValueTaskSource的实例集中起来,用于重复调用。Socket最多可以缓存两个这样的实例,一个用于读,一个用于写,因为99.999%的情况是在同一时间最多只有一个接收和一个发送。

我提到了ValueTask,但没有提到ValueTask。当只处理避免同步完成的分配时,使用非泛型ValueTask(代表无结果的无效操作)在性能上没有什么好处,因为同样的条件可以用Task.CompletedTask来表示。但是,一旦我们关心在异步完成的情况下使用可池化的底层对象来避免分配的能力,那么这对非泛型也很重要。因此,当IValueTaskSource被引入时,IValueTaskSource和ValueTask也被引入。

因此,我们有Task、Task、ValueTask和ValueTask。我们能够以各种方式与它们交互,表示任意的异步操作,并连接continuation来处理这些异步操作的完成。没错,我们可以在操作完成之前或之后执行这些操作。

但是……这些延续仍然是回调!

我们仍然被迫采用延续传递的方式来编码我们的异步控制流!!!

这仍然很难做到!!

我们如何才能解决这个问题????

 

C# Iterators to the Rescue

这个解决方案的一线希望实际上是在Task出现的几年前,即C# 2.0,当时它增加了对迭代器的支持。

“迭代器?“你会问?“你是说IEnumerable?”就是这个。迭代器允许你编写一个方法,然后由编译器用来实现IEnumerable和/或IEnumerator。例如,如果我想创建一个产生斐波那契数列的枚举数,我可以这样写:

public static IEnumerable Fib() { int prev = 0, next = 1; yield return prev; yield return next; while (true) { int sum = prev + next; yield return sum; prev = next; next = sum; } }

然后我可以用foreach枚举它:

foreach (int i in Fib()) { if (i > 100) break; Console.Write($"{i} "); }

我可以通过像System.Linq.Enumerable上的组合器将它与其他IEnumerable进行组合:

foreach (int i in Fib().Take(12)) { Console.Write($"{i} "); }

或者我可以直接通过IEnumerator来手动枚举它:

using IEnumerator e = Fib().GetEnumerator(); while (e.MoveNext()) { int i = e.Current; if (i > 100) break; Console.Write($"{i} "); }

以上所有的结果是这样的输出:

0 1 1 2 3 5 8 13 21 34 55 89

真正有趣的是,为了实现上述目标,我们需要能够多次进入和退出Fib方法。我们调用MoveNext,它进入方法,然后该方法执行,直到它遇到yield return,此时对MoveNext的调用需要返回true,随后对Current的访问需要返回yield value。然后我们再次调用MoveNext,我们需要能够在Fib中从我们上次停止的地方开始,并且保持上次调用的所有状态不变。迭代器实际上是由c#语言/编译器提供的协程,编译器将Fib迭代器扩展为一个成熟的状态机:

public static IEnumerable Fib() => new d__0(-2); [CompilerGenerated] private sealed class d__0 : IEnumerable, IEnumerable, IEnumerator, IEnumerator, IDisposable { private int 1__state; private int 2__current; private int l__initialThreadId; private int 5__2; private int 5__3; private int 5__4; int IEnumerator.Current => 2__current; object IEnumerator.Current => 2__current; public d__0(int 1__state) { this.1__state = 1__state; l__initialThreadId = Environment.CurrentManagedThreadId; } private bool MoveNext() { switch (1__state) { default: return false; case 0: 1__state = -1; 5__2 = 0; 5__3 = 1; 2__current = 5__2; 1__state = 1; return true; case 1: 1__state = -1; 2__current = 5__3; 1__state = 2; return true; case 2: 1__state = -1; break; case 3: 1__state = -1; 5__2 = 5__3; 5__3 = 5__4; break; } 5__4 = 5__2 + 5__3; 2__current = 5__4; 1__state = 3; return true; } IEnumerator IEnumerable.GetEnumerator() { if (1__state == -2 && l__initialThreadId == Environment.CurrentManagedThreadId) { 1__state = 0; return this; } return new d__0(0); } IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable)this).GetEnumerator(); void IEnumerator.Reset() => throw new NotSupportedException(); void IDisposable.Dispose() { } }

所有关于Fib的逻辑现在都在MoveNext方法中,但是作为跳转表的一部分,它允许实现分支到它上次离开的位置,这在枚举器类型上生成的状态字段中被跟踪。而我写的局部变量,如prev、next和sum,已经被 “提升 “为枚举器上的字段,这样它们就可以在调用MoveNext时持续存在。

 

(注意,前面的代码片段显示了C#编译器是如何发出实现的,它不会按原样编译。C#编译器合成了 “不可说 “的名字,这意味着它以一种有效的IL但无效的C#方式命名它所创建的类型和成员,这样就不会与任何用户命名的类型和成员冲突。我保留了编译器的所有名称,但如果你想尝试编译它,你可以使用有效的c#名称重命名。)

 

在我之前的例子中,我展示的最后一种枚举形式涉及手动使用IEnumerator。在那个层面上,我们手动调用MoveNext(),决定何时是重新进入循环程序的适当时机。但是……如果不这样调用它,而是让MoveNext的下一次调用实际成为异步操作完成时执行的延续工作的一部分呢?如果我可以yield返回一些代表异步操作的东西,并让消耗代码将continuation连接到该yield对象,然后在该continuation执行MoveNext时会怎么样?使用这种方法,我可以编写一个辅助方法:

static Task IterateAsync(IEnumerable tasks) { var tcs = new TaskCompletionSource(); IEnumerator e = tasks.GetEnumerator(); void Process() { try { if (e.MoveNext()) { e.Current.ContinueWith(t => Process()); return; } } catch (Exception e) { tcs.SetException(e); return; } tcs.SetResult(); }; Process(); return tcs.Task; }

 

现在变得有趣了。我们得到了一个可迭代的任务列表。每次我们MoveNext 到下一个Task 并获得一个时,我们将该任务的continuation连接起来;当这个Task 完成时,它只会回过头来调用执行MoveNext、获取下一个Task 的相同逻辑,以此类推。这是建立在将Task 作为任何异步操作的单一表示的思想之上的,所以我们输入的枚举表可以是一个任何异步操作的序列。这样的序列是从哪里来的呢?当然是通过迭代器。还记得我们之前的CopyStreamToStream例子吗,以及基于APM的实现是多么的光荣而可怕?考虑一下这个:

static Task CopyStreamToStreamAsync(Stream source, Stream destination) { return IterateAsync(Impl(source, destination)); static IEnumerable Impl(Stream source, Stream destination) { var buffer = new byte[0x1000]; while (true) { Task read = source.ReadAsync(buffer, 0, buffer.Length); yield return read; int numRead = read.Result; if (numRead Console.WriteLine(number.Value)); number.Value = 0; Console.ReadLine();

这段代码每次运行时都会打印42。在我们对委托进行排队之后,我们将AsyncLocal的值重置为0,这无关紧要,因为ExecutionContext是作为QueueUserWorkItem调用的一部分被捕获的,而该捕获包含了当时AsyncLocal的状态。我们可以通过实现我们自己的简单线程池来更详细地了解这一点:

using System.Collections.Concurrent; var number = new AsyncLocal(); number.Value = 42; MyThreadPool.QueueUserWorkItem(() => Console.WriteLine(number.Value)); number.Value = 0; Console.ReadLine(); class MyThreadPool { private static readonly BlockingCollection s_workItems = new(); public static void QueueUserWorkItem(Action workItem) { s_workItems.Add((workItem, ExecutionContext.Capture())); } static MyThreadPool() { for (int i = 0; i < Environment.ProcessorCount; i++) { new Thread(() => { while (true) { (Action action, ExecutionContext? ec) = s_workItems.Take(); if (ec is null) { action(); } else { ExecutionContext.Run(ec, s => ((Action)s!)(), action); } } }) { IsBackground = true }.UnsafeStart(); } } }

这里,MyThreadPool有一个BlockingCollection表示它的工作项队列,每个工作项都是要调用的工作的委托,以及与该工作相关联的ExecutionContext。池的静态构造函数启动一堆线程,每个线程都处于无限循环中,获取下一个工作项并运行它。如果没有为给定委托捕获ExecutionContext,该委托就会被直接调用。但如果捕获了一个ExecutionContext,我们不是直接调用委托,而是调用ExecutionContext.Run方法,该方法会在运行委托之前将提供的ExecutionContext恢复为当前上下文,然后再重置上下文。这个例子包含了与之前完全相同的AsyncLocal的代码,除了这次使用了MyThreadPool而不是ThreadPool,但它仍然每次输出42,因为池子里的ExecutionContext是正确流动的。

 

顺便说一句,你会注意到我在MyThreadPool的静态构造函数中调用了UnsafeStart。启动一个新的线程正是那种应该流向ExecutionContext的异步点,事实上,Thread的Start方法使用ExecutionContext.Capture来捕获当前的上下文,将其存储在Thread上,然后在最终调用Thread的ThreadStart委托时使用捕获的上下文。但我不想在这个例子中这样做,因为我不想让Threads在静态构造函数运行时捕获碰巧存在的任何ExecutionContext(这样做会使关于ExecutionContext的演示更加复杂),所以我用UnsafeStart方法代替。以Unsafe开头的线程相关方法与缺少Unsafe前缀的相应方法的行为完全相同,只是它们不捕获ExecutionContext,例如,Thread.Start和Thread.UnsafeStart的工作相同,但Start捕获ExecutionContext,UnsafeStart不捕获。

Back To Start

当我在写AsyncTaskMethodBuilder.Start的实现时,我们绕道讨论了ExecutionContext,我说这是有效的:

public void Start(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { stateMachine.MoveNext(); }

然后建议我简化一下。这种简化忽略了一个事实,即该方法实际上需要将ExecutionContext考虑在内,因此更像是这样:

public void Start(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { ExecutionContext previous = Thread.CurrentThread._executionContext; // [ThreadStatic] field try { stateMachine.MoveNext(); } finally { ExecutionContext.Restore(previous); // internal helper } }

这里不像我之前建议的那样只调用statemmachine .MoveNext(),而是在这里做了一个动作:获取当前的ExecutionContext,再调用MoveNext,然后在它完成时将当前上下文重置为调用MoveNext之前的状态。

 

这样做的原因是为了防止异步方法将环境数据泄露给调用者。一个示例方法说明了为什么这很重要:

async Task ElevateAsAdminAndRunAsync() { using (WindowsIdentity identity = LoginAdmin()) { using (WindowsImpersonationContext impersonatedUser = identity.Impersonate()) { await DoSensitiveWorkAsync(); } } }

“冒充”是将当前用户的环境信息改为其他人的;这让代码可以代表其他人,使用他们的特权和访问权限。在。net中,这种模拟跨异步操作流动,这意味着它是ExecutionContext的一部分。现在想象一下,如果Start没有恢复之前的上下文,考虑下面的代码:

Task t = ElevateAsAdminAndRunAsync(); PrintUser(); await t;

这段代码可以发现,ElevateAsAdminAndRunAsync中修改的ExecutionContext在ElevateAsAdminAndRunAsync返回到它的同步调用者之后仍然存在(这发生在该方法第一次等待尚未完成的内容时)。这是因为在调用Impersonate之后,我们调用了DoSensitiveWorkAsync并等待它返回的任务。假设任务没有完成,它将导致对ElevateAsAdminAndRunAsync的调用yield并返回到调用者,模拟仍然在当前线程上有效。这不是我们想要的。因此,Start设置了这个保护机制,以确保对ExecutionContext的任何修改都不会从同步方法调用中流出,而只会随着方法执行的任何后续工作一起流出。

 

MoveNext

因此,调用了入口点方法,初始化了状态机结构体,调用了Start,然后调用了MoveNext。什么是MoveNext?这个方法包含了开发者方法中所有的原始逻辑,但做了一大堆修改。让我们先看看这个方法的脚手架。下面是编译器为我们的方法生成的反编译版本,但删除了生成的try块中的所有内容:

private void MoveNext() { try { ... // all of the code from the CopyStreamToStreamAsync method body, but not exactly as it was written } catch (Exception exception) { 1__state = -2; 5__2 = null; t__builder.SetException(exception); return; } 1__state = -2; 5__2 = null; t__builder.SetResult(); }

 

无论MoveNext执行什么其他工作,当所有工作完成后,它都有责任完成async Task方法返回的任务。如果try代码块的主体抛出了未处理的异常,那么任务就会抛出该异常。如果async方法成功到达它的终点(相当于同步方法返回),它将成功完成返回的任务。在任何一种情况下,它都将设置状态机的状态以表示完成。(我有时听到开发人员从理论上说,当涉及到异常时,在第一个await之前抛出的异常和在第一个await之后抛出的异常是有区别的……基于上述,应该清楚情况并非如此。任何未在async方法中处理的异常,不管它在方法的什么位置,也不管方法是否产生了结果,都会在上面的catch块中结束,然后被捕获的异常会存储在async方法返回的任务中。)

 

还要注意,这个完成过程是通过构建器完成的,使用它的SetException和SetResult方法,这是编译器预期的构建器模式的一部分。如果async方法之前已经挂起了,那么构建器将不得不在挂起处理中创建一个Task (稍后我们会看到如何以及在哪里执行),在这种情况下,调用SetException/SetResult将完成该任务。然而,如果async方法之前没有挂起,那么我们还没有创建任务或向调用者返回任何东西,因此构建器在生成任务时有更大的灵活性。如果你还记得之前在入口点方法中,它做的最后一件事是将任务返回给调用者,它通过访问构建器的Task属性(我知道,有这么多东西被称为 “Task”)返回结果:

public Task CopyStreamToStreamAsync(Stream source, Stream destination) { ... return stateMachine.t__builder.Task; }

构建器知道该方法是否挂起过,如果挂起了,它就会返回已经创建的任务。如果方法从未挂起,而且构建器还没有任务,那么它可以在这里创建一个完成的任务。在这种情况下,在成功完成的情况下,它可以直接使用Task.CompletedTask而不是分配一个新的任务,避免任何分配。如果是一般的任务,构建者可以直接使用Task.FromResult(TResult result)。

构建器还可以对它创建的对象进行任何它认为合适的转换。例如,Task实际上有三种可能的最终状态:成功、失败和取消。AsyncTaskMethodBuilder的SetException方法处理特殊情况OperationCanceledException,将任务转换为TaskStatus。如果提供的异常是OperationCanceledException或源自OperationCanceledException,则将任务转换为TaskStatus.Canceled最终状态;否则,任务以TaskStatus.Faulted结束;这种区别在使用代码时往往不明显; 因为无论异常被标记为取消还是故障,都会被存储到Task中,等待该任务的代码将无法观察到状态之间的区别(无论哪种情况,原始异常都会被传播)…… 它只影响与任务直接交互的代码,例如通过ContinueWith,它具有重载,允许仅为完成状态的子集调用continuation。

现在我们了解了生命周期方面的内容,下面是在MoveNext的try块内填写的所有内容:

private void MoveNext() { try { int num = 1__state; TaskAwaiter awaiter; if (num != 0) { if (num != 1) { 5__2 = new byte[4096]; goto IL_008b; } awaiter = u__2; u__2 = default(TaskAwaiter); num = (1__state = -1); goto IL_00f0; } TaskAwaiter awaiter2 = u__1; u__1 = default(TaskAwaiter); num = (1__state = -1); IL_0084: awaiter2.GetResult(); IL_008b: awaiter = source.ReadAsync(5__2, 0, 5__2.Length).GetAwaiter(); if (!awaiter.IsCompleted) { num = (1__state = 1); u__2 = awaiter; t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this); return; } IL_00f0: int result; if ((result = awaiter.GetResult()) != 0) { awaiter2 = destination.WriteAsync(5__2, 0, result).GetAwaiter(); if (!awaiter2.IsCompleted) { num = (1__state = 0); u__1 = awaiter2; t__builder.AwaitUnsafeOnCompleted(ref awaiter2, ref this); return; } goto IL_0084; } } catch (Exception exception) { 1__state = -2; 5__2 = null; t__builder.SetException(exception); return; } 1__state = -2; 5__2 = null; t__builder.SetResult(); }

这种复杂的情况可能感觉有点熟悉。还记得我们基于APM手动实现的BeginCopyStreamToStream有多复杂吗?这没有那么复杂,但也更好,因为编译器为我们做了这些工作,以延续传递的形式重写了方法,同时确保为这些延续保留了所有必要的状态。即便如此,我们也可以眯着眼睛跟着走。请记住,状态在入口点被初始化为-1。然后我们进入MoveNext,发现这个状态(现在存储在本地num中)既不是0也不是1,因此执行创建临时缓冲区的代码,然后跳转到标签IL_008b,在这里调用stream.ReadAsync。注意,在这一点上,我们仍然从调用MoveNext同步运行,因此从开始到入口点都同步运行,这意味着开发者的代码调用了CopyStreamToStreamAsync,它仍然在同步执行,还没有返回一个Task来表示这个方法的最终完成。这可能要改变了…

我们调用Stream.ReadAsync,从中得到一个Task。读取可能是同步完成的,也可能是异步完成的,但速度快到现在已经完成,也可能还没有完成。不管怎么说,我们有一个表示最终完成的Task,编译器发出的代码会检查该Task以决定如何继续:如果该Task确实已经完成(不管它是同步完成还是只是在我们检查时完成),那么这个方法的代码就可以继续同步运行……当我们可以在这里继续运行时,没有必要花不必要的开销排队处理该方法执行的剩余部分。但是为了处理Task还没有完成的情况,编译器需要发出代码来为Task挂上一个延续。因此,它需要发出代码,询问任务 “你完成了吗?” 它是否是直接与任务对话来问这个问题?

如果你在C#中唯一可以等待的东西是System.Threading.Tasks.Task,这将是一种限制。同样地,如果C#编译器必须知道每一种可能被等待的类型,那也是一种限制。相反,c#在这种情况下通常会做的是:它采用了一种api模式。代码可以等待任何公开适当模式(“awaiter”模式)的东西(就像您可以等待任何提供适当的“可枚举”模式的东西一样)。例如,我们可以增强前面写的MyTask类型来实现awaiter模式:

class MyTask { ... public MyTaskAwaiter GetAwaiter() => new MyTaskAwaiter { _task = this }; public struct MyTaskAwaiter : ICriticalNotifyCompletion { internal MyTask _task; public bool IsCompleted => _task._completed; public void OnCompleted(Action continuation) => _task.ContinueWith(_ => continuation()); public void UnsafeOnCompleted(Action continuation) => _task.ContinueWith(_ => continuation()); public void GetResult() => _task.Wait(); } }

如果一个类型公开了getwaiter()方法,就可以等待它,Task就是这样做的。这个方法需要返回一些内容,而这些内容又公开了几个成员,包括一个IsCompleted属性,用于在调用IsCompleted时检查操作是否已经完成。你可以看到正在发生的事情:在IL_008b,从ReadAsync返回的任务已经调用了getwaiter,然后在struct awaiter实例上完成访问。如果IsCompleted返回true,那么最终会执行到IL_00f0,在这里代码会调用awaiter的另一个成员:GetResult()。如果操作失败,GetResult()负责抛出异常,以便将其传播到async方法中的await之外;否则,GetResult()负责返回操作的结果(如果有的话)。在ReadAsync的例子中,如果结果为0,那么我们跳出读写循环,到方法的末尾调用SetResult,就完成了。

不过,回过头来看一下,真正有趣的部分是,如果IsCompleted检查实际上返回false,会发生什么。如果它返回true,我们就继续处理循环,类似于在APM模式中completedsynchronized返回true, Begin方法的调用者负责继续执行,而不是回调函数。但是如果IsCompleted返回false,我们需要暂停async方法的执行,直到await操作完成。这意味着从MoveNext中返回,因为这是Start的一部分,我们仍然在入口点方法中,这意味着将任务返回给调用者。但在发生任何事情之前,我们需要将continuation连接到正在等待的任务(注意,为了避免像在APM情况中那样的stack dives,如果异步操作在IsCompleted返回false后完成,但在我们连接continuation之前,continuation仍然需要从调用线程异步调用,因此它将进入队列)。由于我们可以等待任何东西,我们不能直接与任务实例对话;相反,我们需要通过一些基于模式的方法来执行此操作。

这是否意味着awaiter上有一个方法可以连接continuation?这是有道理的;毕竟,Task本身支持continuation,有一个ContinueWith方法等等……难道不是由GetAwaiter返回的TaskAwaiter公开了让我们设置continuation的方法吗?事实上,确实如此。awaiter模式要求awaiter实现INotifyCompletion接口,该接口包含一个方法void OnCompleted(操作延续)。awaiter还可以选择性地实现ICriticalNotifyCompletion接口,该接口继承了INotifyCompletion,并添加了一个无效的UnsafeOnCompleted(操作延续)方法。根据我们之前对ExecutionContext的讨论,你可以猜到这两个方法之间的区别是:都连接了continuation,但OnCompleted应该流向ExecutionContext,而UnsafeOnCompleted不需要。这主要是历史原因,与代码访问安全(CAS)有关。CAS在.NET Core中已经不存在了,在.NET Framework中默认是关闭的,只有当你选择重新启用遗留的部分信任功能时,它才会发挥作用。当使用部分信任时,CAS信息流作为ExecutionContext的一部分,因此不流动它是“不安全的”,这就是为什么不流动ExecutionContext的方法要加上“不安全的”前缀。这种方法也被归为[SecurityCritical],部分可信的代码不能调用[SecurityCritical]方法。因此,创建了两个OnCompleted的变体,如果提供,编译器更喜欢使用UnsafeOnCompleted,但OnCompleted变体总是自己提供,以防awaiter需要支持部分信任。然而,从异步方法的角度来看,编译器总是将ExecutionContext流过等待点,所以等待器也这样做是不必要的,而且是重复的工作。

Awaiter公开了一个方法来连接continuation。编译器可以直接使用它,除了一个非常关键的问题:continuation到底应该是什么?更重要的是,它应该与什么对象相关联?请记住,状态机结构体在栈上,我们当前运行的MoveNext调用是对该实例的方法调用。我们需要保存状态机,以便在恢复时我们拥有所有正确的状态,这意味着状态机不能一直存在于栈中;它需要被复制到堆上的某个地方,因为栈最终将被用于该线程执行的其他后续的、无关的工作。然后,延续需要在堆上的状态机副本上调用MoveNext方法。

此外,ExecutionContext也与此相关。状态机需要确保存储在ExecutionContext中的任何环境数据在暂停时被捕获,然后在恢复时被应用,这意味着延续也需要合并该ExecutionContext。因此,仅仅在状态机上创建一个指向MoveNext的委托是不够的。这也是我们不想要的开销。如果当我们挂起时,我们在状态机上创建了一个指向MoveNext的委托,那么每次这样做我们都要对状态机结构进行装箱(即使它已经作为其他对象的一部分在堆上)并分配一个额外的委托(委托的这个对象引用将是该结构体的一个新装箱的副本)。因此,我们需要做一个复杂的动作,即确保我们只在方法第一次暂停执行时将该结构从堆栈中提升到堆中,而在其他时候都使用相同的堆对象作为MoveNext的目标,并在这个过程中确保我们捕获了正确的上下文,并在恢复时确保我们使用捕获的上下文来调用该操作。

这比我们希望编译器发出的逻辑多得多……出于几个原因,我们希望它封装在一个helper中。第一,有很多复杂的代码要发送到每个用户的程序集中。第二,我们希望允许自定义该逻辑作为构建器模式实现的一部分(在后面讨论池时,我们将看到一个例子来解释为什么)。第三,我们希望能够发展和改进这种逻辑,并使现有的先前编译的二进制文件变得更好。这不是假设; 在 .net Core 2.1中,支持这一功能的库代码被彻底修改了,因此比在 .netFramewor上操作更加高效。我们将首先探索它在 .net Framework上是如何工作的,然后再看看现在在 .net Core中发生了什么。

你可以在c#编译器生成的代码中看到,当我们需要挂起时就会发生:

if (!awaiter.IsCompleted) // we need to suspend when IsCompleted is false { 1__state = 1; u__2 = awaiter; t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this); return; }

我们将状态id存储到state字段中,该id表示当方法恢复时应该跳转到的位置。然后,我们将awaiter本身持久化到一个字段中,以便在恢复后可以使用它来调用GetResult。然后在返回MoveNext调用之前,我们要做的最后一件事是调用t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this),要求构建器为这个状态机连接一个continuation到awaiter。(注意,它调用构建器的AwaitUnsafeOnCompleted而不是构建器的AwaitOnCompleted,因为awaiter实现了iccriticalnotifycompletion;状态机处理流动的ExecutionContext,所以我们不需要awaiter也……正如前面提到的,这样做只会带来重复和不必要的开销。)

AwaitUnsafeOnCompleted方法的实现太复杂了,不能在这里复制,所以我将总结它在.NET Framework上的作用:

它使用Capture()来获取当前上下文。 然后它分配一个MoveNextRunner对象来包装捕获的上下文和装箱的状态机(如果这是该方法第一次挂起,我们还没有状态机,所以我们只使用null作为占位符)。 然后,它创建一个操作委托给该MoveNextRunner上的Run方法;这就是它如何能够获得一个委托,该委托将在捕获的ExecutionContext的上下文中调用状态机的MoveNext。 如果这是该方法第一次挂起,我们还没有装箱的状态机,所以此时它会将其装箱,通过将实例存储到本地类型的IAsyncStateMachine接口中,在堆上创建一个副本。然后,这个盒子会被存储到已分配的MoveNextRunner中。 现在到了一个有些令人费解的步骤。如果您查看状态机结构体的定义,它包含构建器,public AsyncTaskMethodBuilder t__builder;,如果你查看构建器的定义,它包含内部的IAsyncStateMachine m_stateMachine;。构建器需要引用装箱的状态机,以便在后续的挂起中它可以看到它已经装箱了状态机,并且不需要再次这样做。但是我们只是装箱了状态机,并且该状态机包含一个m_stateMachine字段为null的构建器。我们需要改变装箱状态机的构建器的m_stateMachine指向它的父容器。为了实现这一点,编译器生成的状态机结构体实现了IAsyncStateMachine接口,其中包括一个void SetStateMachine(IAsyncStateMachine stateMachine);方法,该状态机结构体包含了该接口方法的实现: private void SetStateMachine(IAsyncStateMachine stateMachine) => t__builder.SetStateMachine(stateMachine);

因此,构建器对状态机进行装箱,然后将装箱传递给装箱的SetStateMachine方法,该方法会调用构建器的SetStateMachine方法,将装箱存储到字段中。

最后,我们有一个表示continuation的Action,它被传递给awaiter的UnsafeOnCompleted方法。在TaskAwaiter的情况下,任务将将该操作存储到任务的continuation列表中,这样当任务完成时,它将调用该操作,通过Run回调,通过ExecutionContext.Run回调,最后调用状态机的MoveNext方法重新进入状态机,并从它停止的地方继续运行。

这就是在.net Framework中发生的事情,你可以在分析器中看到结果,例如通过运行分配分析器来查看每个await上的分配情况。让我们看看这个愚蠢的程序,我写这个程序只是为了强调其中涉及的分配成本:

using System.Threading; using System.Threading.Tasks; class Program { static async Task Main() { var al = new AsyncLocal() { Value = 42 }; for (int i = 0; i < 1000; i++) { await SomeMethodAsync(); } } static async Task SomeMethodAsync() { for (int i = 0; i < 1000; i++) { await Task.Yield(); } } }

这个程序创建了一个AsyncLocal,让值42通过所有后续的异步操作。然后它调用SomeMethodAsync 1000次,每次暂停/恢复1000次。在Visual Studio中,我使用 .NET Object Allocation Tracking profiler运行它,结果如下:

Image AllocationNetFramework

那是…很多的分配!让我们来研究一下它们的来源。

ExecutionContext。有超过一百万个这样的内容被分配。为什么?因为在 .net框架中,ExecutionContext是一个可变的数据结构。由于我们希望流转一个异步操作被fork时的数据,并且我们不希望它在fork之后看到执行的变更,我们需要复制ExecutionContext。每个单独的fork操作都需要这样的副本,因此有1000次对SomeMethodAsync的调用,每个调用都会暂停/恢复1000次,我们有100万个ExecutionContext实例。

Action。类似地,每次我们等待尚未完成的任务时(我们的百万个await Task.Yield()s就是这种情况),我们最终分配一个新的操作委托来传递给awaiter的UnsafeOnCompleted方法。

MoveNextRunner。同样的,有一百万个这样的例子,因为在前面的步骤大纲中,每次我们暂停时,我们都要分配一个新的MoveNextRunner来存储Action和ExecutionContext,以便使用后者来执行前者。

LogicalCallContext。另一个一百万。这些是.NET框架上AsyncLocal的实现细节;AsyncLocal将其数据存储到ExecutionContext的“逻辑调用上下文”中,这是表示与ExecutionContext一起流动的一般状态的一种奇特方式。如果我们要复制一百万个ExecutionContext,我们也会复制一百万个LogicalCallContext。

QueueUserWorkItemCallback。每个Task.Yield()都将一个工作项排队到线程池中,导致分配了100万个工作项对象用于表示这100万个操作。

Task< VoidResult >。这里有一千个这样的,所以至少我们脱离了”百万”俱乐部。每个异步完成的异步任务调用都需要分配一个新的Task实例来表示该调用的最终完成。

< SomeMethodAsync > d__1。这是编译器生成的状态机结构的盒子。1000个方法挂起,1000个盒子出现。

QueueSegment / IThreadPoolWorkItem[]。有几千个这样的方法,从技术上讲,它们与具体的异步方法无关,而是与线程池中的队列工作有关。在 .net框架中,线程池的队列是一个非循环段的链表。这些段不会被重用;对于长度为N的段,一旦N个工作项被加入到该段的队列中并从该段中退出,该段就会被丢弃并当作垃圾回收。

这就是. net框架。这是.NET Core:

Image AllocationNetCore

漂亮多了!对于. net框架上的这个示例,有超过500万次分配,总共分配了大约145MB的内存。对于 .net Core上的相同示例,只有大约1000个内存分配,总共只有大约109KB。为什么这么少?

ExecutionContext。在.net Core中,ExecutionContext现在是不可变的。这样做的缺点是,对上下文的每次更改,例如将值设置为AsyncLocal,都需要分配一个新的ExecutionContext。然而,好处是,流动的上下文比改变它更常见,而且由于ExecutionContext现在是不可变的,我们不再需要作为流动的一部分进行克隆。“捕获”上下文实际上就是从字段中读取它,而不是读取它并复制其内容。因此,流动不仅比变化更常见,而且更便宜。

LogicalCallContext。这在.NET Core中已经不存在了。在.NET Core中,ExecutionContext唯一存在的东西是AsyncLocal的存储。其他在ExecutionContext中有自己特殊位置的东西都是以AsyncLocal为模型的。例如,在.NET Framework中,模拟将作为SecurityContext的一部分流动,而SecurityContext是ExecutionContext的一部分;在.NET Core中,模拟通过AsyncLocal流动,它使用valueChangedHandler来对当前线程进行适当的更改。

QueueSegment / IThreadPoolWorkItem[]。在.net Core中,ThreadPool的全局队列现在被实现为ConcurrentQueue,而ConcurrentQueue已经被重写为一个由非固定大小的循环段组成的链表。一旦段的长度大到永远不会被填满因为稳态的出队列能够跟上稳态的入队列,就不需要再分配额外的段,相同的足够大的段就会被无休止地使用。

那么其他的分配呢,比如Action、MoveNextRunner和d__1?要理解剩余的分配是如何被移除的,需要深入了解它在.net Core上是如何工作的。

让我们回到讨论挂起时发生的事情:

if (!awaiter.IsCompleted) // we need to suspend when IsCompleted is false { 1__state = 1; u__2 = awaiter; t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this); return; }

不管目标是哪个平台,这里发出的代码都是相同的,所以不管是.net Framework还是,为这个挂起生成的IL都是相同的。但是,改变的是AwaitUnsafeOnCompleted方法的实现,在.net Core中有很大的不同:

事情的开始是一样的:该方法调用Capture()来获取当前执行上下文。 然后,事情偏离了.net Framework。.NET Core中的builder只有一个字段: public struct AsyncTaskMethodBuilder { private Task? m_task; ... }

在捕获ExecutionContext之后,它检查m_task字段是否包含一个AsyncStateMachineBox的实例,其中TStateMachine是编译器生成的状态机结构体的类型。AsyncStateMachineBox类型是“魔法”。它的定义如下:

private class AsyncStateMachineBox : Task, IAsyncStateMachineBox where TStateMachine : IAsyncStateMachine { private Action? _moveNextAction; public TStateMachine? StateMachine; public ExecutionContext? Context; ... }

与其说这是一个单独的Task,不如说这是一个任务(注意其基本类型)。该结构并没有对状态机进行装箱,而是作为该任务的强类型字段存在。我们不需要用单独的 MoveNextRunner 来存储 Action 和 ExecutionContext,它们只是这个类型的字段,而且由于这是存储在构建器的 m_task 字段中的实例,我们可以直接访问它,不需要在每次暂停时重新分配。如果ExecutionContext发生变化,我们可以用新的上下文覆盖该字段,而不需要分配其他东西;我们的任何Action仍然指向正确的地方。所以,在捕获了ExecutionContext之后,如果我们已经有了这个AsyncStateMachineBox的实例,这就不是这个方法第一次挂起了,我们可以直接把新捕获的ExecutionContext存储到其中。如果我们还没有一个AsyncStateMachineBox的实例,那么我们需要分配它:

var box = new AsyncStateMachineBox(); taskField = box; // important: this must be done before storing stateMachine into box.StateMachine! box.StateMachine = stateMachine; box.Context = currentContext;

请注意源注释为“重要”的那一行。这取代了.net框架中复杂的SetStateMachine,使得SetStateMachine在.net Core中根本没有使用。你看到的taskField有一个指向AsyncTaskMethodBuilder的m_task字段的ref。我们分配AsyncStateMachineBox< tstatemachinebox >,然后通过taskField将对象存储到构建器的m_task中(这是在栈上的状态机结构中的构建器),然后将基于堆栈的状态机(现在已经包含对盒子的引用)复制到基于堆的AsyncStateMachineBox< tstatemachinebox >中,这样AsyncStateMachineBox适当地并递归地结束引用自己。这仍然是令人费解的,但却是一种更有效的费解

然后,我们可以对这个Action上的一个方法进行操作,该方法将调用其MoveNext方法,该方法将在调用StateMachine的MoveNext之前执行适当的ExecutionContext恢复。该Action 可以缓存到_moveNextAction字段中,以便任何后续使用都可以重用相同的Action 。然后,该Action 被传递给awaiter的UnsafeOnCompleted来连接continuation。

 

它解释了为什么剩下的大部分分配都没有了:d__1没有被装箱,而是作为任务本身的一个字段存在,MoveNextRunner不再需要,因为它的存在只是为了存储Action和ExecutionContext。但是,根据这个解释,我们仍然应该看到1000个操作分配,每个方法调用一个,但我们没有。为什么?还有那些QueueUserWorkItemCallback对象呢?我们仍然在Task.Yield()中进行排队,为什么它们没有出现呢?

正如我所提到的,将实现细节推入核心库的好处之一是,它可以随着时间的推移改进实现,我们已经看到了它是如何从. net Framework发展到. net core的。它在最初为.net Core重写的基础上进一步发展,增加了额外的优化,这得益于对系统关键组件的内部访问。特别是,异步基础设施知道Task和TaskAwaiter等核心类型。而且因为它知道它们并具有内部访问权限,所以它不必遵循公开定义的规则。c#语言遵循的awaiter模式要求awaiter具有AwaitOnCompleted或AwaitUnsafeOnCompleted方法,这两个方法都将continuation作为一个操作,这意味着基础结构需要能够创建一个操作来表示continuation,以便与基础结构不知道的任意awaiter一起工作。但是,如果基础设施遇到它知道的awaiter,它没有义务采取相同的代码路径。对于System.Private中定义的所有核心awaiter。因此,CoreLib的基础设施可以遵循更简洁的路径,完全不需要操作。这些awaiter都知道IAsyncStateMachineBoxes,并且能够将box对象本身作为continuation。例如,Task返回的YieldAwaitable.Yield能够将IAsyncStateMachineBox本身作为工作项直接放入ThreadPool中,而等待任务时使用的TaskAwaiter能够将IAsyncStateMachineBox本身直接存储到任务的延续列表中。不需要操作,也不需要QueueUserWorkItemCallback。

因此,在非常常见的情况下,async方法只等待System.Private.CoreLib (Task, Task, ValueTask, ValueTask, YieldAwaitable,以及它们的ConfigureAwait变体),最坏的情况下,只有一次开销分配与async方法的整个生命周期相关:如果这个方法挂起了,它会分配一个单一的Task-derived类型来存储所有其他需要的状态,如果这个方法从来没有挂起,就不会产生额外的分配。

如果愿意,我们也可以去掉最后一个分配,至少以平摊的方式。如所示,有一个默认构建器与Task(AsyncTaskMethodBuilder)相关联,类似地,有一个默认构建器与任务 (AsyncTaskMethodBuilder)和ValueTask和ValueTask (AsyncValueTaskMethodBuilder和AsyncValueTaskMethodBuilder,分别)相关联。对于ValueTask/ValueTask,构造器实际上相当简单,因为它们本身只处理同步且成功完成的情况,在这种情况下,异步方法完成而不挂起,构建器可以只返回一个ValueTask.Completed或者一个包含结果值的ValueTask。对于其他所有事情,它们只是委托给AsyncTaskMethodBuilder/AsyncTaskMethodBuilder,因为ValueTask/ValueTask会被返回包装一个Task,它可以共享所有相同的逻辑。但是.NET 6 and C# 10引入了一个方法可以覆盖逐个方法使用的构建器的能力,并为ValueTask/ValueTask引入了几个专门的构建器,它们能够池化IValueTaskSource/IValueTaskSource对象来表示最终的完成,而不是使用Tasks。

我们可以在我们的样本中看到这一点的影响。稍微调整一下之前分析的SomeMethodAsync函数,让它返回ValueTask而不是Task:

static async ValueTask SomeMethodAsync() { for (int i = 0; i < 1000; i++) { await Task.Yield(); } }

这将生成以下入口点:

[AsyncStateMachine(typeof(d__1))] private static ValueTask SomeMethodAsync() { d__1 stateMachine = default; stateMachine.t__builder = AsyncValueTaskMethodBuilder.Create(); stateMachine.1__state = -1; stateMachine.t__builder.Start(ref stateMachine); return stateMachine.t__builder.Task; }

现在,我们添加[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]到SomeMethodAsync的声明中:

[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] static async ValueTask SomeMethodAsync() { for (int i = 0; i < 1000; i++) { await Task.Yield(); } }

编译器输出如下:

[AsyncStateMachine(typeof(d__1))] [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] private static ValueTask SomeMethodAsync() { d__1 stateMachine = default; stateMachine.t__builder = PoolingAsyncValueTaskMethodBuilder.Create(); stateMachine.1__state = -1; stateMachine.t__builder.Start(ref stateMachine); return stateMachine.t__builder.Task; }

整个实现的实际c#代码生成,包括整个状态机(没有显示),几乎是相同的;唯一的区别是创建和存储的构建器的类型,因此在我们之前看到的任何引用构建器的地方都可以使用。如果你看一下 PoolingAsyncValueTaskMethodBuilder的代码,你会看到它的结构几乎与AsyncTaskMethodBuilder相同,包括使用一些完全相同的共享例程来做一些事情,如特殊套管已知的awaiter类型。关键的区别是,当方法第一次挂起时,它不是执行新的AsyncStateMachineBox(),而是执行StateMachineBox. rentfromcache(),并且在async方法(SomeMethodAsync)完成并等待返回的ValueTask完成时,租用的盒子会被返回到缓存中。这意味着(平摊)零分配:

Image AllocationNetCoreWithPooling

这个缓存本身有点意思。对象池可能是一个好主意,也可能是一个坏主意。创建一个对象的成本越高,共享它们的价值就越大;因此,例如,对非常大的数组进行池化比对非常小的数组进行池化更有价值,因为更大的数组不仅需要更多的CPU周期和内存访问为零,它们还会给垃圾收集器带来更大的压力,使其更频繁地收集垃圾。然而,对于非常小的对象,将它们池化可能会带来负面影响。池只是内存分配器,GC也是,所以当您使用池时,您是在权衡与一个分配器相关的成本与另一个分配器相关的成本,并且GC在处理大量微小的、生存期短的对象方面非常高效。如果你在对象的构造函数中做了很多工作,避免这些工作可以使分配器本身的开销相形见绌,从而使池变得有价值。但是,如果您在对象的构造函数中几乎没有做任何工作,并且将其进行池化,则您将打赌您的分配器(您的池)就所采用的访问模式而言比GC更有效,而这通常是一个糟糕的赌注。还涉及其他成本,在某些情况下,您可能最终会有效地对抗GC的启发式方法;例如,垃圾回收是基于一个前提进行优化的,即从较高代(如gen2)对象到较低代(如gen0)对象的引用相对较少,但池化对象可以使这些前提失效。

现在,由异步方法创建的对象并不小,而且它们可能在很热门的路径上,所以池化可能是合理的。但为了使它尽可能有价值,我们也想尽可能避免开销。因此,这个池子非常简单,选择让租借和返回的速度非常快,几乎没有竞争,即使这意味着它最终可能会比更积极地缓存更多的东西来分配。对于每种状态机类型,实现池化到每个线程一个状态机盒,每个核心一个状态机盒;这使得它能够以最小的开销和最小的竞争来租用和返回(没有其他线程可以同时访问线程专用缓存,也很少有其他线程同时访问内核专用缓存)。虽然这看起来是一个相对较小的池子,但它在显著减少稳态分配方面也相当有效,因为池子只负责存储当前不使用的对象;你可以在任何时候有一百万个异步方法都在运行,即使池子在每个线程和每个核心只能存储一个对象,它仍然可以避免丢弃大量对象,因为它只需要存储一个对象足够长的时间来把它从一个操作转移到另一个操作,而不是在它被该操作使用时。

 

SynchronizationContext 和 ConfigureAwait

我们之前在EAP模式的上下文中讨论过SynchronizationContext,并提到它将再次出现。SynchronizationContext使得调用可重用的辅助函数成为可能,并自动被调度回调用环境认为合适的任何地方。因此,我们很自然地认为async/await能“正常工作”,事实也的确如此。回到前面的按钮单击处理程序:

ThreadPool.QueueUserWorkItem(_ => { string message = ComputeMessage(); button1.BeginInvoke(() => { button1.Text = message; }); });

使用async/await,我们可以这样写:

button1.Text = await Task.Run(() => ComputeMessage());

对ComputeMessage的调用被转移到线程池中,这个方法执行完毕后,执行又转移回与按钮关联的UI线程,设置按钮的Text属性就是在这个线程中进行的。

与SynchronizationContext的集成由awaiter实现(为状态机生成的代码对SynchronizationContext一无所知),因为当所表示的异步操作完成时,是awaiter负责实际调用或将所提供的continuation排队。而自定义awaiter不需要考虑SynchronizationContext。目前,Task、Task、ValueTask、ValueTask的等待器都是do。这意味着,默认情况下,当你等待一个任务,一个Task,一个ValueTask,一个ValueTask,甚至Task. yield()调用的结果时,awaiter默认会查找当前的SynchronizationContext,如果它成功地获得了一个非默认的同步上下文,最终会将continuation排队到该上下文。

 

如果我们查看TaskAwaiter中涉及的代码,就可以看到这一点。以下是Corelib中的相关代码片段:

internal void UnsafeSetContinuationForAwait(IAsyncStateMachineBox stateMachineBox, bool continueOnCapturedContext) { if (continueOnCapturedContext) { SynchronizationContext? syncCtx = SynchronizationContext.Current; if (syncCtx != null && syncCtx.GetType() != typeof(SynchronizationContext)) { var tc = new SynchronizationContextAwaitTaskContinuation(syncCtx, stateMachineBox.MoveNextAction, flowExecutionContext: false); if (!AddTaskContinuation(tc, addBeforeOthers: false)) { tc.Run(this, canInlineContinuationTask: false); } return; } else { TaskScheduler? scheduler = TaskScheduler.InternalCurrent; if (scheduler != null && scheduler != TaskScheduler.Default) { var tc = new TaskSchedulerAwaitTaskContinuation(scheduler, stateMachineBox.MoveNextAction, flowExecutionContext: false); if (!AddTaskContinuation(tc, addBeforeOthers: false)) { tc.Run(this, canInlineContinuationTask: false); } return; } } } ... }

这是一个方法的一部分,用于确定将哪个对象作为continuation存储到任务中。它被传递给stateMachineBox,如前所述,它可以直接存储到任务的continuation列表中。但是,这个特殊的逻辑可能会将IAsyncStateMachineBox封装起来,以合并一个调度程序(如果存在的话)。它检查当前是否有非默认的SynchronizationContext,如果有,它会创建一个SynchronizationContextAwaitTaskContinuation作为实际的对象,它会被存储为continuation;该对象依次包装了原始的和捕获的SynchronizationContext,并知道如何在与后者排队的工作项中调用前者的MoveNext。这就是如何在UI应用程序中作为事件处理程序的一部分等待,并在等待完成后让代码继续在正确的线程上运行。这里要注意的下一个有趣的事情是,它不仅仅关注一个SynchronizationContext:如果它找不到一个自定义的SynchronizationContext来使用,它还会查看Tasks使用的TaskScheduler类型是否有一个需要考虑的自定义类型。和SynchronizationContext一样,如果有一个非默认值,它就会和原始框一起包装在TaskSchedulerAwaitTaskContinuation中,用作continuation对象。

但这里最值得注意的可能是方法主体的第一行:if (continueOnCapturedContext)。我们只在continueOnCapturedContext为true时才对SynchronizationContext/TaskScheduler进行这些检查;如果这个值为false,实现方式就好像两者都是默认值一样,会忽略它们。请问是什么将continueOnCapturedContext设置为false?你可能已经猜到了:使用非常流行的ConfigureAwait(false)。

我在ConfigureAwait FAQ中详细讨论了ConfigureAwait所以我建议你阅读这篇文章以获取更多信息。可以这样说,作为await的一部分,ConfigureAwait(false)做的唯一一件事是将它的参数布尔值作为continueOnCapturedContext值提供给这个函数(以及其他类似的函数),以便跳过对SynchronizationContext/TaskScheduler的检查,表现得好像它们都不存在一样。对于进程来说,这允许Task 在它认为合适的地方调用其continuation,而不是强制将它们排队在某个特定的调度器上执行。

我之前提到过SynchronizationContext的另一个方面,我说过我们会再次看到它:OperationStarted/OperationCompleted。现在是时候了。这是人人都讨厌的特性:异步void。除了configureawait之外,async void可以说是async/await中最具争议性的特性之一。它被添加的原因只有一个:事件处理程序。在UI应用程序中,你可以编写如下代码:

button1.Click += async (sender, eventArgs) => { button1.Text = await Task.Run(() => ComputeMessage()); };

但如果所有的异步方法都必须有一个像Task这样的返回类型,你就不能这样做了。Click事件有一个签名public event EventHandler? Click;,其中EventHandler定义为public delegate void EventHandler(object? sender, EventArgs e);,因此要提供一个符合该签名的方法,该方法需要是void-returning。

有各种各样的理由认为async void是不好的,为什么文章建议尽可能避免使用它,以及为什么出现了各种 analyzers来标记使用async void。最大的问题之一是委托推理。考虑下面的程序:

using System.Diagnostics; Time(async () => { Console.WriteLine("Enter"); await Task.Delay(TimeSpan.FromSeconds(10)); Console.WriteLine("Exit"); }); static void Time(Action action) { Console.WriteLine("Timing..."); Stopwatch sw = Stopwatch.StartNew(); action(); Console.WriteLine($"...done timing: {sw.Elapsed}"); }

人们很容易期望它输出至少10秒的运行时间,但如果你运行它,你会发现输出是这样的:

Timing... Enter ...done timing: 00:00:00.0037550

嗯?当然,根据我们在这篇文章中讨论的所有内容,应该可以理解问题是什么。async lambda实际上是一个异步void方法。异步方法会在遇到第一个暂停点时返回调用者。如果这是一个异步Task 方法,Task 就会在这个时间点返回。但对于async void,什么都不会返回。Time方法只知道它调用了action();委托调用返回;它不知道async方法实际上仍在“运行”,并将在稍后异步完成。

这就是OperationStarted/OperationCompleted的作用。这种异步void方法本质上与前面讨论的EAP方法类似:这种方法的初始化是void,因此需要一些其他机制来跟踪所有此类操作。因此,EAP实现在操作启动时调用当前SynchronizationContext的OperationStarted,在操作完成时调用OperationCompleted, async void也做同样的事情。与async void相关的构建器是AsyncVoidMethodBuilder。还记得在async方法的入口,编译器生成的代码如何调用构建器的静态Create方法来获得适当的构建器实例吗?AsyncVoidMethodBuilder利用了这一点来挂钩创建和调用OperationStarted:

public static AsyncVoidMethodBuilder Create() { SynchronizationContext? sc = SynchronizationContext.Current; sc?.OperationStarted(); return new AsyncVoidMethodBuilder() { _synchronizationContext = sc }; }

类似地,当通过SetResult或SetException将构建器标记为完成时,它会调用相应的OperationCompleted方法。这就是像xunit这样的单元测试框架如何能够具有异步void测试方法,并仍然在并发测试执行中使用最大程度的并发,例如在xunit的AsyncTestSyncContext中。

有了这些知识,我们现在可以重写我们的timing示例:

using System.Diagnostics; Time(async () => { Console.WriteLine("Enter"); await Task.Delay(TimeSpan.FromSeconds(10)); Console.WriteLine("Exit"); }); static void Time(Action action) { var oldCtx = SynchronizationContext.Current; try { var newCtx = new CountdownContext(); SynchronizationContext.SetSynchronizationContext(newCtx); Console.WriteLine("Timing..."); Stopwatch sw = Stopwatch.StartNew(); action(); newCtx.SignalAndWait(); Console.WriteLine($"...done timing: {sw.Elapsed}"); } finally { SynchronizationContext.SetSynchronizationContext(oldCtx); } } sealed class CountdownContext : SynchronizationContext { private readonly ManualResetEventSlim _mres = new ManualResetEventSlim(false); private int _remaining = 1; public override void OperationStarted() => Interlocked.Increment(ref _remaining); public override void OperationCompleted() { if (Interlocked.Decrement(ref _remaining) == 0) { _mres.Set(); } } public void SignalAndWait() { OperationCompleted(); _mres.Wait(); } }

在这里,我已经创建了一个SynchronizationContext,它跟踪了一个待定操作的计数,并支持阻塞等待它们全部完成。当我运行它时,我得到这样的输出:

Timing... Enter Exit ...done timing: 00:00:10.0149074

然后!

State Machine Fields

至此,我们已经看到了生成的入口点方法,以及MoveNext实现中的一切是如何工作的。我们还了解了在状态机上定义的一些字段。让我们仔细看看这些。

对于前面给出的CopyStreamToStream方法:

public async Task CopyStreamToStreamAsync(Stream source, Stream destination) { var buffer = new byte[0x1000]; int numRead; while ((numRead = await source.ReadAsync(buffer, 0, buffer.Length)) != 0) { await destination.WriteAsync(buffer, 0, numRead); } }

下面是我们最终得到的字段:

private struct d__0 : IAsyncStateMachine { public int 1__state; public AsyncTaskMethodBuilder t__builder; public Stream source; public Stream destination; private byte[] 5__2; private TaskAwaiter u__1; private TaskAwaiter u__2; ... }

这些都是什么?

< > 1 __state。是“状态机”中的“状态”。它定义了状态机所处的当前状态,最重要的是下次调用MoveNext时应该做什么。如果状态为-2,则操作完成。如果状态是-1,要么是我们第一次调用MoveNext,要么是MoveNext代码正在某个线程上运行。如果你正在调试一个async方法的处理过程,并且你看到状态为-1,这意味着在某处有某个线程正在执行包含在方法中的代码。如果状态大于等于0,方法会被挂起,状态的值会告诉你在什么时候挂起。虽然这不是一个严格的规则(某些代码模式可能会混淆编号),但通常情况下,分配的状态对应于从0开始的await编号,按照源代码从上到下的顺序排列。例如,如果async方法的函数体完全是:

await A(); await B(); await C(); await D();

你发现状态值是2,这几乎肯定意味着async方法当前被挂起,等待从C()返回的任务完成。

< > t__builder。这是状态机的构建器,例如用于Task的AsyncTaskMethodBuilder,用于ValueTask的AsyncValueTaskMethodBuilder,用于async void方法的AsyncVoidMethodBuilder,或用于async返回类型的AsyncMethodBuilder(…)]或通过async方法本身的属性覆盖的任何构建器。如前所述,构建器负责async方法的生命周期,包括创建return任务,最终完成该任务,并充当暂停的中介,async方法中的代码要求构建器暂停,直到特定的awaiter完成。

来源/目的地。这些是方法的参数。你可以看出,它们没有被篡改名字;编译器完全按照参数名称的指定来命名它们。如前所述,所有被方法主体使用的参数都需要被存储到状态机中,以便MoveNext方法能够访问它们。注意我说的是 “被使用”。如果编译器发现一个参数没有被异步方法的主体使用,它就可以优化,不需要存储这个字段。例如,给定下面的方法:

public async Task M(int someArgument) { await Task.Yield(); }

编译器会将这些字段发送到状态机:

private struct d__0 : IAsyncStateMachine { public int 1__state; public AsyncTaskMethodBuilder t__builder; private YieldAwaitable.YieldAwaiter u__1; ... }

请注意,这里明显缺少名为someArgument的参数。但是,如果我们改变async方法,让它以任何方式使用实参:

public async Task M(int someArgument) { Console.WriteLine(someArgument); await Task.Yield(); }

它显示:

private struct d__0 : IAsyncStateMachine { public int 1__state; public AsyncTaskMethodBuilder t__builder; public int someArgument; private YieldAwaitable.YieldAwaiter u__1; ... }

5__2;。这是缓冲区的 “局部”,它被提升为一个字段,这样它就可以在等待点上存活。编译器相当努力地防止状态被不必要地提升。注意,在源码中还有一个局部变量numRead,在状态机中没有相应的字段。为什么?因为它没有必要。这个局部变量被设置为ReadAsync调用的结果,然后被用作WriteAsync调用的输入。在这两者之间没有await,因此numRead的值需要被存储。。就像在一个同步方法中,JIT编译器可以选择将这样的值完全存储在一个寄存器中,而不会真正将其溢出到堆栈中,C#编译器可以避免将这个局部变量提升为一个字段,因为它不需要在任何等待中保存它的值。一般来说,如果C#编译器能够证明局部变量的值不需要在等待中保存,它就可以省略局部变量的提升。

u__1和u__2。async方法中有两个await:一个用于ReadAsync返回的Task,另一个用于WriteAsync返回的Task。Task. getawaiter()返回一个TaskAwaiter, Task. getawaiter()返回一个TaskAwaiter,两者都是不同的结构体类型。由于编译器需要在await (IsCompleted, UnsafeOnCompleted)之前获取这些awaiter,然后需要在await (GetResult)之后访问它们,因此需要存储这些awaiter。由于它们是不同的结构类型,编译器需要维护两个单独的字段来做到这一点(另一种选择是将它们装箱,并为awaiter提供一个对象字段,但这会导致额外的分配成本)。不过,编译器会尽可能地重复使用字段。如果我有:

public async Task M() { await Task.FromResult(1); await Task.FromResult(true); await Task.FromResult(2); await Task.FromResult(false); await Task.FromResult(3); }

有五个等待,但只涉及两种不同类型的等待者:三个是TaskAwaiter,两个是TaskAwaiter。因此,状态机上最终只有两个等待者字段:

private struct d__0 : IAsyncStateMachine { public int 1__state; public AsyncTaskMethodBuilder t__builder; private TaskAwaiter u__1; private TaskAwaiter u__2; ... }

然后,如果我将我的示例改为:

public async Task M() { await Task.FromResult(1); await Task.FromResult(true); await Task.FromResult(2).ConfigureAwait(false); await Task.FromResult(false).ConfigureAwait(false); await Task.FromResult(3); }

仍然只涉及Tasks和Tasks,但实际上我使用了四个不同的struct awaiter类型,因为从ConfigureAwait返回的东西上的GetAwaiter()调用返回的awaiter与Task.GetAwaiter()返回的是不同的类型…从编译器创建的awaiter字段可以再次很明显的看出:

private struct d__0 : IAsyncStateMachine { public int 1__state; public AsyncTaskMethodBuilder t__builder; private TaskAwaiter u__1; private TaskAwaiter u__2; private ConfiguredTaskAwaitable.ConfiguredTaskAwaiter u__3; private ConfiguredTaskAwaitable.ConfiguredTaskAwaiter u__4; ... }

如果您发现自己想要优化与异步状态机相关的大小,您可以查看的一件事是是否可以合并正在等待的事情,从而合并这些awaiter字段。

您可能还会看到在状态机上定义的其他类型的字段。值得注意的是,您可能会看到一些字段包含单词“wrap”。考虑下面这个愚蠢的例子:

public async Task M() => await Task.FromResult(42) + DateTime.Now.Second;

这将生成一个包含以下字段的状态机:

private struct d__0 : IAsyncStateMachine { public int 1__state; public AsyncTaskMethodBuilder t__builder; private TaskAwaiter u__1; ... }

到目前为止没有什么特别的。现在颠倒一下添加表达式的顺序:

public async Task M() => DateTime.Now.Second + await Task.FromResult(42);

这样,你就得到了这些字段:

private struct d__0 : IAsyncStateMachine { public int 1__state; public AsyncTaskMethodBuilder t__builder; private int 7__wrap1; private TaskAwaiter u__1; ... }

我们现在有了另一个函数:7__wrap1。为什么?因为我们计算了DateTime.Now的值。其次,只有在计算完它之后,我们才需要等待一些东西,并且需要保留第一个表达式的值,以便将其与第二个表达式的结果相加。因此,编译器需要确保第一个表达式的临时结果可以添加到await的结果中,这意味着它需要将表达式的结果溢出到临时中,它使用7__wrap1字段做到了这一点。如果你发现自己对异步方法的实现进行了超优化,以减少分配的内存量,你可以寻找这样的字段,并查看对源代码的微调是否可以避免溢出的需要,从而避免这种临时的需要。

总结

我希望这篇文章有助于解释当你使用async/await时背后到底发生了什么,但幸运的是,你通常不需要知道或关心。这里有很多变化,所有这些结合在一起,创建了一个高效的解决方案,可以编写可拓展的异步代码,而不必处理回调。然而,归根结底,这些部分实际上是相对简单的:任何异步操作的通用表示,一种能够将普通控制流重写为协程的状态机实现的语言和编译器,以及将它们绑定在一起的模式。其他一切都是优化的额外收获。

 

编程愉快!

 

 

 

 

Songjie Cai

Follow



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3