Rust 异步与取消

您所在的位置:网站首页 rust自动关门怎么取消 Rust 异步与取消

Rust 异步与取消

2024-07-16 15:11| 来源: 网络整理| 查看: 265

Rust 异步与取消

作者: 王江桐、周紫鹏

在 Rust 中,异步原语可分为两类:

Future,是 Rust 语言以及标准库的一部分,实现了 Future trait 的实例可以调用 await,试图获取异步任务执行的状态,在运行时的调度下,Future 的实际代码逻辑会转化为 Task(任务); Task(任务)则和 Future 相反,并不是 Rust 语言以及标准库的一部分,同时在一般的异步中,也不是用户直接接触到的部分。异步执行逻辑组成 Task 单元,由异步运行时管理,可以在多线程上运行。执行器在多线程中执行、移动以及调度 Task,Task 返回 Future,当任务执行完成时,用户通过 Future 获取任务执行的返回值。

在很多其它语言中,例如 JavaScript,这些语言处理异步时,使用的是类如 Task 的任务单元,并且不提供类如 Future 的结构。语言自带的运行时则会优化任务的调度以及运行。Rust 则相反,并没有自带运行时,用户接触到的是 Future 而非 Task。运行时以及任务调度则由三方库提供,例如 tokio 以及 asnyc-std。

在一些场景下,我们可能需要取消已生成的实现 Future trait 的实例。一个不完全贴切的例子是,如果出于某些原因,我们误生成了下载动作,以及相关的后续逻辑,那么我们应当有方法取消这一误操作产生的结果。

取消 Futures

不考虑async drop的情况下,不论是取消单一的 Future 还是一整个 Future 树,或是取消 await 之前或之后的 Future,取消都非常简单,只需要调用 drop 函数即可,Rust 标准库会处理后续的流程。

Rust 语言组还没有实现 async drop。关于这一点,可见《This Week in Rust #412:Rust Async trait更新与多线程》。

举例来说,对于取消调用 await 之前的 Future:

use std::time::Duration; struct Guard; impl Drop for Guard { fn drop(&mut self) { println!("2"); } } async fn foo(guard: Guard) { println!("3"); task::sleep(Duration::from_secs(1)).await; println!("4"); } fn main() { println!("1"); let guard = Guard {}; let fut = foo(guard); // 取消 drop(fut); println!("done"); }

打印结果是:

> 1 > 2 > done

取消调用了一次 await 的 Future:

#![allow(unused)] fn main() { use std::{ptr, task}; use async_std::task; use std::time::Duration; async fn foo() { println!("2"); task::sleep(Duration::from_secs(1)).await; println!("3"); } let mut fut = Box::pin(foo()); let mut cx = empty_cx(); println!("1"); assert!(fut.as_mut().poll(&mut cx).is_pending()); drop(fut); println!("done"); /// Create an empty "Waker" callback, wrapped in a "Context" structure. /// How this works is not particularly important for the rest of this post. fn empty_cx() -> task::Context { ... } }

打印结果是:

> 1 > 2 > done

在 Future 逻辑的两个 await 之间,或是在最后一个 await 调用之后,无法取消 Future。在这种情况下,没有 await,即没有异步逻辑,Future 的内部逻辑执行没有暂停点,会持续执行直到通过 await 暂停或返回。

取消 Future 树:

#![allow(unused)] fn main() { use async_std::prelude::*; use async_std::task; use std::time::Duration; async fn foo() { println!("2"); bar().timeout(Duration::from_secs(3)).await; println!("5"); } async fn bar() { println!("3"); task::sleep(Duration::from_secs(2)).await; println!("4"); } println!("1"); foo().timeout(Duration::from_secs(1)).await; println!("done"); }

对于 Future 树:

main -> foo (times out after 1 sec) -> bar (times out after 3 secs) -> task::sleep (wait for 2 secs)

取消后的打印结果为:

> 1 > 2 > 3 > done # `4` and `5` are never printed

虽然在大部分情况下,当实例理论上退出生命周期时,Rust 都会调用它们的析构函数,但是在一些场景下,析构函数并不能保证被调用,例如引用循环以及多线程中使用 Rc 。在其它语言,例如 Swift 和 Go 中,语言层级上的解决方案是使用 defer 关键字。这一关键字可以保证匿名析构函数一定会被调用。这部分的解析可见《The defer keyword in Swift: try/finally done right》。在 Rust 中,目前则暂时没有运行时直接回应这个问题,Rust 的三方库 scopeguard 提供了一个类似于 defer 的宏,以及相应的套件。 使用 defer 会生成析构函数运行保证器,不过它会直接获取数据的所有权。如果我们只希望当 Future 中途取消时,在退出的时候才将所有权移交,即在此之前所有权应当由用户管理,而不是保证器的话,那么可以使用 scopeguard::SopeGuard,然后通过 Deref / DerefMut trait 来获取其中数据。

取消 Task

Task 并不是 Rust 标准化的实例,因此无法通过 drop 或其他非常直观的手段暂停:

#![allow(unused)] fn main() { // 以 async-std 运行时为例 use async_std::task; let handle = task::spawn(async { task::sleep(Duration::from_secs(1)).await; println!("2"); }); println!("1"); drop(handle); // Drop the task handle. task::sleep(Duration::from_secs(2)).await; println!("done"); }

打印结果为:

> 1 > 2 > done

可以看到,drop handle 并不会取消异步 Task 的运行。对于 Task,async-std 运行时的逻辑与 tokio 相同。生成的异步任务与返回的 handle 无关,handle 生命周期结束并不会连带结束它所指向的 Task,而是使得这一 Task 与 handle 分离。这一语义被称为 "detach on drop",即析构时分离。

一般来说,针对这种情况,不同的运行时会提供不同的处理方式。例如,async-std 使用 JoinHandle::cancel,tokio 使用 JoinHandle::abort 来终止单一 Task 的运行:

#![allow(unused)] fn main() { use async_std::task; let handle = task::spawn(async { task::sleep(Duration::from_secs(1)).await; println!("2"); }); println!("1"); handle.cancel().await; // Cancel the task handle task::sleep(Duration::from_secs(2)).await; println!("done"); }

打印结果为:

> 1 > done

然而,由于 Task 的取消并不能通过析构 handle 来实现,取消 Task 树并不像取消 Future 树,会自动向下传递取消行为。逻辑上,使用取消单一 Task 的方式可能可以成功取消根 Task,但是这可能导致树中的其他 Task 悬垂,并继续运行,而非取消执行并结束生命周期。

在三方库运行时中,smol 运行时会自动传递 Task 树的取消:

#![allow(unused)] fn main() { smol::block_on(async { println!("1"); let task = smol::spawn(async { println!("2"); }); drop(task); println!("done") }); }

打印结果为:

> 1 > done

然而,并不是所有运行时都像 smol 一样会自动传递 Task 树的取消。对于其他的一些运行时,如果要传递这一行为,用户可能要考虑手动写一个包装器(Wrapper),确保调用 drop 时,所有指向树中对应 Task 的 handle 都会调用 abort:

#![allow(unused)] fn main() { use tokio::task; use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll}; /// Spawn a new tokio Task and cancel it on drop. pub fn spawn(future: T) -> Wrapper where T: Future + Send + 'static, T::Output: Send + 'static, { Wrapper(task::spawn(future)) } /// Cancels the wrapped tokio Task on Drop. pub struct Wrapper(task::JoinHandle); impl Future for Wrapper{ type Output = Result; fn poll(mut self: Pin, cx: &mut Context


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3