async await 异步编程

async 编程模型与其他模型的区别

Future

Future 是Rust异步编程的语意核心, 它是一个能产生异步值的。(类比Javascript的Promise)

Rust展开
123456789101112131415
enum Poll<T> {
    Ready(T),
    Pending,
}

struct Context<'a> {
    waker: &'Waker,
    _maker: PhantomData<fn (&'a ()) -> &'a()>
}


trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, context: &Context<'_>) -> Poll<Self::Output>
}

Future 需要被执行器轮询(poll)后才会运行, 通过调用这个方法,可以将Future向前推进,直到被切走,但并不保证它可以在一次poll中完成。

如果Future完成了,则返回 Poll::Ready(T), 如果没有完成则返回 Poll::Pending , 并且设定一个wake函数, 当Future准备好再次执行时,这个函数会被调用 然后管理该Future的执行器会再次调用poll函数,使得Future能继续推进。

Context 可以记录Future的状态,

Context 中的Waker实现了wake方法, 它用于唤醒一个Future

Rust展开
1234567891011121314151617181920
impl Waker {
    /// Wake up the task associated with this `Waker`.
    #[inline]
    #[stable(feature = "futures_api", since = "1.36.0")]
    pub fn wake(self) {
        // The actual wakeup call is delegated through a virtual function call
        // to the implementation which is defined by the executor.
        let wake = self.waker.vtable.wake;
        let data = self.waker.data;

        // Don't call `drop` -- the waker will be consumed by `wake`.
        crate::mem::forget(self);

        // SAFETY: This is safe because `Waker::from_raw` is the only way
        // to initialize `wake` and `data` requiring the user to acknowledge
        // that the contract of `RawWaker` is upheld.
        unsafe { (wake)(data) };
    }
    ...
}

对于Future来说, 第一次poll的时候没有完成是很正常的事情,但需要确保后续它Ready的时候,执行器能可以再次调度它, 这个过程是通过Waker完成的。

如果没有wake函数,则执行器不知道某个Future是否可以继续执行,除非执行器轮询每一个Future,但这样效率较低。有了 wake Future可以主动通知执行器, 然后执行器可以精确的执行这个Future

实现一个简单的定时器

Rust展开
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
use std:: {
    future::Future,
    sync::{Arc, Mutex, mpsc::{SyncSender, sync_channel, Receiver}},
    task::{ Context, Poll, Waker },
    thread,
    time::Duration,
    marker::Send
};

use futures::{future::BoxFuture, FutureExt, task::{ArcWake, waker_ref} };

struct ShareData {
    completed: bool,
    waker: Option<Waker>,
}

struct  TimerFuture {
    data: Arc<Mutex<ShareData>>
}

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut share_data = self.data.lock().unwrap();
        if share_data.completed {
           Poll::Ready(())
        }else{
            share_data.waker = Some(cx.waker().clone());
           Poll::Pending
        }
    }
}

impl TimerFuture {
    fn new(dur: Duration)->Self {
        let share_data = Arc::new(Mutex::new(ShareData{
            completed: false,
            waker: None
        }));
        let thread_share_data = share_data.clone();
        thread::spawn(move || {
            thread::sleep(dur);
            let mut data = thread_share_data.lock().unwrap();
            data.completed = true;
            if let Some(waker) = data.waker.take() {
                waker.wake();
            }
        });

        TimerFuture { data: share_data }
    }
}

struct  Exector {
    ready_queue: Receiver<Arc<Task>>
}

struct Spwaner {
    task_sender: SyncSender<Arc<Task>>
}
impl Spwaner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send){
        let future = future.boxed();
        let task = Arc::new(Task{
            future: Mutex::new(Some(future)),
            sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("task is full");
    }
}

struct Task {
    future:  Mutex<Option<BoxFuture<'static, ()>>>,
    sender: SyncSender<Arc<Task>>
} 
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        let cloned = arc_self.clone();
        arc_self.sender.send(cloned).expect("oops!, task is full");
    }
}

impl Exector {
    fn run(&self){
        while let Ok(task) = self.ready_queue.recv() {
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                let waker = waker_ref(&task);
                let context = &mut  Context::from_waker(&*waker);
                if future.as_mut().poll(context).is_pending() {
                    *future_slot = Some(future);
                }
            }
        }
    }
}

fn new_exector_and_spwanr ()-> (Exector, Spwaner) {
   
    const MAX_QUEUE : usize= 2;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUE);
    return (Exector{ ready_queue }, Spwaner{ task_sender }); 
}

fn main(){
    let (exector, spwaner) = new_exector_and_spwanr();

    spwaner.spawn(async {
        println!("Hey1");
        TimerFuture::new(Duration::from_secs(1)).await; 
        println!("done2");
    });
    spwaner.spawn(async {
        println!("Hey2");
        TimerFuture::new(Duration::from_secs(1)).await; 
        println!("done2");
    });

    drop(spwaner);

    exector.run();
}
Rust展开
12345678
Compiling playground v0.0.1 (/playground)
    Finished dev [unoptimized + debuginfo] target(s) in 2.04s
     Running `target/debug/playground`

Hey1
Hey2
done2
done2

- roadup -