Rust 异步原理

 

Rust异步

Rust 只在语法层面上提供异步支持: Future + async/await,异步的 runtime 留给社区来实现,常见的 runtime 包括 tokio、async-std、smol 等

如下所示, Future 只有一个 poll 方法和一个绑定变量 Output,其中 poll 方法返回值是一个 Poll<Self::Output>,这是一个 enum 变量,包括 ReadyPending 两种状态。

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}

异步 runtime 通过 poll 方法来执行 Future, 根据返回的 Poll 枚举来确定执行状态, 若执行完毕则返回 Ready, 不然返回 Pending, 异步 runtime 就会将该任务挂起等待下一次执行(通过 Context 中的 waker 来在特定时间重新唤醒任务)

手动实现一个 SleepFuture

#[tokio::main]
async fn main() {
    println!("start in main");
    SleepFuture::new(Duration::from_secs(1)).await;
    println!("stop in main");
}
struct SleepFuture {
    duration: Duration,
    state: Arc<Mutex<State>>,
}

struct State {
    waker: Option<Waker>,
    inner_state: InnerState,
}

#[derive(PartialEq)]
enum InnerState {
    Init,
    Sleep,
    Done,
}

impl SleepFuture {
    fn new(duration: Duration) -> Self { Self { duration, state: Arc::new(Mutex::new(State{ waker: None, inner_state: InnerState::Init, }))}}
}

impl Future for SleepFuture {
    type Output = ();
    fn poll(self: Pin<&mut self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut state_guard = self.state.lock().unwrap();
        println!("pooling...");

        if state_guard.inner_state == InnerState::Done {
            return Poll::Ready(());
        }

        if state_guard.inner_state == InnerState::Init {
            state_guard.waker = Some(cx.waker().clone());
            state_guard.inner_state = InnerState::Sleeping;
            let duration = self.duration;
            let state_clone = self.state.clone();
            thread::spawn(move || {
                println!("start sleep...");
                thread::sleep(duration);
                let mut guard = state_clone.lock().unwrap();
                guard.inner_state = InnerState::Done;
                if let Some(waker) = guard.waker.take() {
                    waker.wake_by_ref()
                }
                println!("wakeup ...");
            });
        }
        
        state_guard.waker = Some(cx.waker().clone());
        Poll::Pending
    }
}

async/await原理

async/await 是一个语法糖, async 是生成一个 Future(实际上是 generator), await 则进行 poll 轮询。 async 是一个实现了 Future 的状态机

利用上一节实现的 SleepFuture 来展示 .await 是如何工作的。

  1. 通过 async/await 来自动实现状态机
#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];
    let s = String::from("hello");

    let res = foo(v, s).await;
    println!("{:?}", res);
}
async fn foo(v: Vec<i32>, s: String) -> u32 {
    println!("{:?}", v);
    sleep(Duration::from_secs(1)).await;
    println!("\n{:?}", s);
    sleep(Duration::from_secs(1)).await;
    42
}
  1. 手动实现状态机
#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];
    let s = String::from("hello");

    let res = FooFut::new(v, s).await;
    println!("{:?}", res);
}

手动实现 async 的效果,就是实现一个 Future。

// 执行 sleep 函数的时候,需要:1. 每个时刻的局部变量;2. 状态机 
struct FooFut {
    state: FooFutState,
    v: Vec<i32>,
    s: String,
}

// 对于 sleep 来说有 4 个状态
enum FooFutState {
    Init,
    Sleeping1(SleepFuture),
    Sleeping2(SleepFuture),
    Done,
}

impl FooFut {
    pub fn new(v: Vec<i32>, s: String) -> Self {
        Self {
            state: FooFutState::Init,
            v,
            s,
        }
    }
}

impl Future for FooFut {
    type Output = u32;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            match self.as_mut().get_mut().state {
                FooFutState::Init => {
                    println!("vector: {:?}", self.v);
                    let fut1 = SleepFuture::new(Duration::from_secs(1));
                    self.as_mut().state = Sleeping1(fut1);
                }
                FooFutState::Sleeping1(ref mut fut1) => match Pin::new(fut1).poll(cx) {
                    Poll::Pending => return Poll::Pending,
                    Poll::Ready(_) => {
                        println!("string: {:?}", self.s);
                        let fut2 = SleepFuture::new(Duration::from_secs(2));
                        self.state = FooFutState::Sleeping2(fut2);
                    }
                },
                FooFutState::Sleeping2(ref mut fut2) => match Pin::new(fut2).poll(cx) {
                    Poll::Pending => return Poll::Pending,
                    Poll::Ready(_) => {
                        self.state = FooFutState::Done;
                    }
                },
                FooFutState::Done => return Poll::Ready(42),
            }
        }
    }
}

参考

Rust 异步编程 Future 理解,手写一个 Future 示例