async await 异步编程
async 编程模型与其他模型的区别
Future 是Rust异步编程的语意核心, 它是一个能产生异步值的。(类比Javascript的Promise)
Rust展开
123456789101112131415
enum Poll<T> {
Ready(T),
Pending,
}
struct Context<'a> {
waker: &'Waker,
_maker: PhantomData<fn (&'a ()) -> &'a()>
}
trait Future {
type Output;
fn poll(self: Pin<&mut Self>, context: &Context<'_>) -> Poll<Self::Output>
}
Future 需要被执行器轮询(poll)后才会运行, 通过调用这个方法,可以将Future向前推进,直到被切走,但并不保证它可以在一次poll中完成。
如果Future完成了,则返回 Poll::Ready(T), 如果没有完成则返回 Poll::Pending , 并且设定一个wake函数, 当Future准备好再次执行时,这个函数会被调用 然后管理该Future的执行器会再次调用poll函数,使得Future能继续推进。
Context 可以记录Future的状态,
Context 中的Waker实现了wake方法, 它用于唤醒一个Future
Rust展开
1234567891011121314151617181920
impl Waker {
/// Wake up the task associated with this `Waker`.
#[inline]
#[stable(feature = "futures_api", since = "1.36.0")]
pub fn wake(self) {
// The actual wakeup call is delegated through a virtual function call
// to the implementation which is defined by the executor.
let wake = self.waker.vtable.wake;
let data = self.waker.data;
// Don't call `drop` -- the waker will be consumed by `wake`.
crate::mem::forget(self);
// SAFETY: This is safe because `Waker::from_raw` is the only way
// to initialize `wake` and `data` requiring the user to acknowledge
// that the contract of `RawWaker` is upheld.
unsafe { (wake)(data) };
}
...
}
对于Future来说, 第一次poll的时候没有完成是很正常的事情,但需要确保后续它Ready的时候,执行器能可以再次调度它, 这个过程是通过Waker完成的。
如果没有wake函数,则执行器不知道某个Future是否可以继续执行,除非执行器轮询每一个Future,但这样效率较低。有了 wake Future可以主动通知执行器, 然后执行器可以精确的执行这个Future。
实现一个简单的定时器:
Rust展开
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
use std:: {
future::Future,
sync::{Arc, Mutex, mpsc::{SyncSender, sync_channel, Receiver}},
task::{ Context, Poll, Waker },
thread,
time::Duration,
marker::Send
};
use futures::{future::BoxFuture, FutureExt, task::{ArcWake, waker_ref} };
struct ShareData {
completed: bool,
waker: Option<Waker>,
}
struct TimerFuture {
data: Arc<Mutex<ShareData>>
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut share_data = self.data.lock().unwrap();
if share_data.completed {
Poll::Ready(())
}else{
share_data.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl TimerFuture {
fn new(dur: Duration)->Self {
let share_data = Arc::new(Mutex::new(ShareData{
completed: false,
waker: None
}));
let thread_share_data = share_data.clone();
thread::spawn(move || {
thread::sleep(dur);
let mut data = thread_share_data.lock().unwrap();
data.completed = true;
if let Some(waker) = data.waker.take() {
waker.wake();
}
});
TimerFuture { data: share_data }
}
}
struct Exector {
ready_queue: Receiver<Arc<Task>>
}
struct Spwaner {
task_sender: SyncSender<Arc<Task>>
}
impl Spwaner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send){
let future = future.boxed();
let task = Arc::new(Task{
future: Mutex::new(Some(future)),
sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("task is full");
}
}
struct Task {
future: Mutex<Option<BoxFuture<'static, ()>>>,
sender: SyncSender<Arc<Task>>
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = arc_self.clone();
arc_self.sender.send(cloned).expect("oops!, task is full");
}
}
impl Exector {
fn run(&self){
while let Ok(task) = self.ready_queue.recv() {
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
if future.as_mut().poll(context).is_pending() {
*future_slot = Some(future);
}
}
}
}
}
fn new_exector_and_spwanr ()-> (Exector, Spwaner) {
const MAX_QUEUE : usize= 2;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUE);
return (Exector{ ready_queue }, Spwaner{ task_sender });
}
fn main(){
let (exector, spwaner) = new_exector_and_spwanr();
spwaner.spawn(async {
println!("Hey1");
TimerFuture::new(Duration::from_secs(1)).await;
println!("done2");
});
spwaner.spawn(async {
println!("Hey2");
TimerFuture::new(Duration::from_secs(1)).await;
println!("done2");
});
drop(spwaner);
exector.run();
}
Rust展开
12345678
Compiling playground v0.0.1 (/playground)
Finished dev [unoptimized + debuginfo] target(s) in 2.04s
Running `target/debug/playground`
Hey1
Hey2
done2
done2