Rust异步
Rust 只在语法层面上提供异步支持: Future + async/await,异步的 runtime 留给社区来实现,常见的 runtime 包括 tokio、async-std、smol 等
如下所示, Future 只有一个 poll
方法和一个绑定变量 Output
,其中 poll
方法返回值是一个 Poll<Self::Output>
,这是一个 enum
变量,包括 Ready
和 Pending
两种状态。
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
异步 runtime 通过 poll
方法来执行 Future
, 根据返回的 Poll
枚举来确定执行状态, 若执行完毕则返回 Ready
, 不然返回 Pending
, 异步 runtime 就会将该任务挂起等待下一次执行(通过 Context
中的 waker
来在特定时间重新唤醒任务)
手动实现一个 SleepFuture
#[tokio::main]
async fn main() {
println!("start in main");
SleepFuture::new(Duration::from_secs(1)).await;
println!("stop in main");
}
struct SleepFuture {
duration: Duration,
state: Arc<Mutex<State>>,
}
struct State {
waker: Option<Waker>,
inner_state: InnerState,
}
#[derive(PartialEq)]
enum InnerState {
Init,
Sleep,
Done,
}
impl SleepFuture {
fn new(duration: Duration) -> Self { Self { duration, state: Arc::new(Mutex::new(State{ waker: None, inner_state: InnerState::Init, }))}}
}
impl Future for SleepFuture {
type Output = ();
fn poll(self: Pin<&mut self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state_guard = self.state.lock().unwrap();
println!("pooling...");
if state_guard.inner_state == InnerState::Done {
return Poll::Ready(());
}
if state_guard.inner_state == InnerState::Init {
state_guard.waker = Some(cx.waker().clone());
state_guard.inner_state = InnerState::Sleeping;
let duration = self.duration;
let state_clone = self.state.clone();
thread::spawn(move || {
println!("start sleep...");
thread::sleep(duration);
let mut guard = state_clone.lock().unwrap();
guard.inner_state = InnerState::Done;
if let Some(waker) = guard.waker.take() {
waker.wake_by_ref()
}
println!("wakeup ...");
});
}
state_guard.waker = Some(cx.waker().clone());
Poll::Pending
}
}
async/await原理
async/await 是一个语法糖, async 是生成一个 Future(实际上是 generator), await 则进行 poll 轮询。
async 是一个实现了 Future
的状态机
利用上一节实现的 SleepFuture
来展示 .await
是如何工作的。
- 通过 async/await 来自动实现状态机
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
let s = String::from("hello");
let res = foo(v, s).await;
println!("{:?}", res);
}
async fn foo(v: Vec<i32>, s: String) -> u32 {
println!("{:?}", v);
sleep(Duration::from_secs(1)).await;
println!("\n{:?}", s);
sleep(Duration::from_secs(1)).await;
42
}
- 手动实现状态机
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
let s = String::from("hello");
let res = FooFut::new(v, s).await;
println!("{:?}", res);
}
手动实现 async 的效果,就是实现一个 Future。
// 执行 sleep 函数的时候,需要:1. 每个时刻的局部变量;2. 状态机
struct FooFut {
state: FooFutState,
v: Vec<i32>,
s: String,
}
// 对于 sleep 来说有 4 个状态
enum FooFutState {
Init,
Sleeping1(SleepFuture),
Sleeping2(SleepFuture),
Done,
}
impl FooFut {
pub fn new(v: Vec<i32>, s: String) -> Self {
Self {
state: FooFutState::Init,
v,
s,
}
}
}
impl Future for FooFut {
type Output = u32;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.as_mut().get_mut().state {
FooFutState::Init => {
println!("vector: {:?}", self.v);
let fut1 = SleepFuture::new(Duration::from_secs(1));
self.as_mut().state = Sleeping1(fut1);
}
FooFutState::Sleeping1(ref mut fut1) => match Pin::new(fut1).poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(_) => {
println!("string: {:?}", self.s);
let fut2 = SleepFuture::new(Duration::from_secs(2));
self.state = FooFutState::Sleeping2(fut2);
}
},
FooFutState::Sleeping2(ref mut fut2) => match Pin::new(fut2).poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(_) => {
self.state = FooFutState::Done;
}
},
FooFutState::Done => return Poll::Ready(42),
}
}
}
}