Rust:异步代码里的阻塞

您所在的位置:网站首页 read阻塞后怎么退出 Rust:异步代码里的阻塞

Rust:异步代码里的阻塞

2023-03-23 04:15| 来源: 网络整理| 查看: 265

大家好,我已经有段时间没写博客了,回来的感觉真好。重要的事情先说 - 这里有一些简讯。在Crossbeam上工作了两年之后,在2019年我已经将主要精力集中在异步编程方面,以此研究如何构建运行时(思考async-stdtokio)上。特别的,我想让异步运行时更高效、更健壮,并同时保持简单

在这篇博文里,我将就所有运行时都会面临的一个有趣的问题“在异步代码里调用阻塞函数”展开一点点讨论。

异步与同步

我们终于在稳定的Rust里拥有了 async/await,并已经准备重写所有以往写的同步类型的代码使之能异步执行。但是我们会吗?或者我们是不是应该这么做?我不知道。

同步库和异步库的分裂状态在不断扩大,比如 stdasync-std,两个看起来很像,但前者有阻塞函数(同步),后者有非阻塞函数(异步)。还有一对看起来很像的库 surfattohttpc:两个都是 http 客户端,但前者是异步的后者是同步的。新的库的作者现在必须面临一个窘境:他们是该提供同步的API,还是提供异步的API?又或者两个都提供。

从当下看这种不必要的API重复是比较尴尬的,但是我依然保持乐观,我相信最终我们会找到正确的方式来处理这个问题。不管怎样,我们必须找到能无缝集成同步代码和异步代码的方式。

从同步到异步(从异步到同步)

Rust里的main函数是同步的,所以为了从main函数进入异步的世界,我们需要明确地这样做。借助于async-std,我们可以通过调用 block_on() 函数从而进入异步世界:

use async_std::task; ​ // This is sync code. fn main() { task::block_on(foo()); } ​ // This is async code. async fn foo() {}

现在反过来,在异步代码里调用同步代码:

// This is async code. async fn foo() { bar(); } ​ // This is sync code. fn bar() {}

从异步到同步,我们不需要做任何额外的事情 - 我们只需要直接调用同步函数,仅此而已!简单。但是我们需要对那些需要执行比较长时间才完成的同步函数小心谨慎。我们并不能在异步的世界里,不假思索的去调用同步代码。

阻塞破坏并发

异步运行时操作中的一个核心假设是,每次futurepoll了,都会立刻返回Ready或者Pending。在异步代码中阻塞一段比较长的时间是不可接受的,这样的事永远不应该发生。

为了理解这一点,我们来编写一个使用surf来并发获取40个WEB页面的程序:

use async_std::task; use std::time::Instant; ​ // Fetch the HTML contents of a web page. async fn get(url: &str) -> String { surf::get(url).recv_string().await.unwrap() } ​ fn main() { task::block_on(async { let start = Instant::now(); let mut tasks = Vec::new(); ​ // Fetch the list of contributors for the first 40 minor Rust releases. for i in 0..40 { let url = format!("https://thanks.rust-lang.org/rust/1.{}.0/", i); ​ // Spawn a task fetching the list. tasks.push(task::spawn(async move { let html = get(&url).await; ​ // Display the number of contributors to this Rust release. for line in html.lines() { if line.contains("individuals") { println!("{}", line.trim()); } } })) } ​ // Wait for all tasks to complete. for t in tasks { t.await; } ​ // Display elapsed time. dbg!(start.elapsed()); }); }

在我的机器上,这个程序大概执行1.5秒就完成了。注意到get函数是异步的,我们能并发的获取40个WEB页面。

现在,让我们将get函数修改为同步的方式。我们把surf库替换为attohttpc库,一个功能很像的crate,区别是后者只提供同步接口:

async fn get(url: &str) -> String { attohttpc::get(url).send().unwrap().text().unwrap() }

毫无意外的,程序效率下降了并且大概需要执行3秒的时间。我的电脑有8个逻辑核心,意味着async-std executor能spawn 8个工作线程,所以同一时间只能拉取8个WEB页面了。

上述实践的意义在于揭示:阻塞破坏并发性。在异步的代码里不发生阻塞操作很重要,不然executor将无法有效的工作,相反时间都花在阻塞上了。

阻塞无处不在

我们已经看到在异步代码里发阻塞会影响性能。更准确的说,这个例子是我们可以构造出来的,当然你也可能简单的用surf替代attohttpc,问题自然而然解决。但坏消息是阻塞是个很狡猾的东西,它无处不在。它经常就在那些你甚至毫无感知的地方。

想一下标准输入和输出。很明显,从标准输入读取数据是阻塞的,所以你不能在异步代码里使用std::io::Stdin。但如果你看到了 println!(),你会皱起眉头吗?我打赌在大多时候,我们都会假设在标准输出里打印东西不会发生阻塞,但事实是,它确实会阻塞。

你也许会想为什么 println!() 会阻塞,假设我们要在shell里执行 program1 | program2,其中program1的输出会通过管道变为program2的输入。如果program2读输入读得很慢,当管道已经满了的时候program1又想打印点什么时,program1就必须阻塞了。

密集型计算也会引起阻塞。想想一下我们要对一个很大的Vec执行排序,进而调用v.sort()。假设排序需要花费大概一秒的时间,我们真应该考虑将其从异步executor中移除。

有时候还存在一些陷阱,程序员一不小心就会掉落其中。举个例子,假设我们在异步代码里使用rayon来调用v.par_sort()。很容易天真的以为这是ok的,因为在rayon'sexecutor里发生排序很正常,但事实是异步的executor会阻塞以等待rayon的结果。

性能低下并不是唯一需要担心的。如果async executor的每个线程都陷入诸如从标准输入读取的事情时,整个程序都有可能陷入死锁状态,无法再取得任何进展!

最后值得一提的是,即使是简单的内存访问也可能阻塞。举个例子:细想一下位于磁盘上的交换分区。如果一个线程正在访问交换分区,该线程将一直阻塞直到页数据从物理磁盘提取并移动到主存(内存)中。

所以阻塞是真的无处不在,并且很难将之从异步代码中清除。我想我们需要接受一个现实:阻塞总是存在于异步代码之中,无论我们多么仔细的想消灭它。

可能的解决方案

当我们期盼在异步代码中阻塞时,我们应该考虑将阻塞逻辑转移到一个不一样的线程池,这样executor将可以继续往下执行而无需等待。像async-stdtokio这类运行时都提供了函数spawn_blocking()来处理这个事情。

为了解释一下这个函数是如何被使用的,让我们看一下async-std库里的fs::read_to_string()是如何实现的:

async fn read_to_string(path: P) -> io::Result { let path = path.as_ref().to_owned(); spawn_blocking(move || std::fs::read_to_string(path)).await }

函数spawn_blocking() 将闭包spawn到一个专用的用于执行阻塞函数的线程池。异步executor就可以无需在等待闭包结果上阻塞了,相反只需要异步await返回的JoinHandle的结果即可。

请注意我们不能传递一个引用给闭包的path,因为在同步的函数read_to_string()执行完毕前,异步的函数read_to_string()可能被取消。不幸运的是,唯一的方式是克隆后传递给闭包的path。这有点低效同时也有点笨重。

幸运的是,tokio有一个替代方案来执行阻塞函数:可以就地执行闭包,并通知当前线程停止成为异步executor的一部分,然后将任务转移给一个新的线程。从某种角度上来说,该过程与spawn_blocking() - 将闭包发送给新线程然后继续event loop相反,它将event loop发送给新线程执行然后继续执行闭包。

以下是如何使用block_in_place()来实现异步版本的read_to_string()

async fn read_to_string(path: P) -> io::Result { block_in_place(|| std::fs::read_to_string(path)) }

请注意我们不再需要clone path,因为在内部同步的read_to_string()完成之前,外部异步的read_to_string()是不可能取消的。(注:一旦内部同步的read_to_string()开始了则意味着,外部异步的read_to_string()已经被poll了)

spawn_blocking()block_in_place()都能解决异步executor陷入阻塞代码的问题,但是两者有一个很重要的区别。spawn_blocking()是一个真正意义上的异步函数,因为它返回一个可以被awaitedfuture,而block_in_place()只是一个普通的同步函数。

为了明白为什么这个很重要,考虑如下代码:

let (s1, s2) = futures::join!(read_to_string("foo.txt"), read_to_string("bar.txt"));

如果read_to_string()是通过spawn_blocking()实现的,两个文件会被并行读取。然而如果是通过block_in_place()实现的,则两个文件会被串行读取(读完第一个再读第二个)

结论

关键要点:

异步代码里的阻塞会影响性能甚至导致死锁我们需要使用spawn_blocking() block_in_place()对阻塞代码进行隔离阻塞无处不在并且很难被完全隔离

更进一步,有时候很难说什么样的代码是阻塞的,什么样的代码是不阻塞的。如果一个函数需要花费一秒才完成,我们可能需要将之认为是阻塞的。但是如果花费一毫秒呢?可能依赖于具体的场景 - 有时候会被认为是阻塞的,有时候会被认为是非阻塞的。真的跟具体场景有关!

阻塞是吓人的,所以我们要在异步代码里进行防御性的隔离。然而我们能做的也只有这些了,阻塞将总是无可避免的侵入我们的异步代码里。这听起来是件挺让人伤心和失望的事,但我挺乐观的。我相信会出现比spawn_blocking()block_in_place()更好的方案,我将在接下来的博文中继续讲述。

翻译自:https://stjepang.github.io/2019/12/04/blocking-inside-async-code.html

翻译已经经过作者同意,翻译过程如有不合适或错误的,欢迎大家指正。

转载请注明出处。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3