Rust加入原生异步支持的时间相当晚(当然比C++标准委员会动作快多了),所以rust中的异步编程看上去并不像javascript那么自然,除了几乎必然要接触到tokio这类第三方库外,语言本身像是Unpin
、Future
这类概念也是拦路虎。这里我们从异步尚不存在的rust出发,看看这栋满是补丁的异步破房子是怎么被修建起来的。
Poll和Future: 最简单的原语
设想我们现在还只有同步代码,要怎么至少“看上去异步”地处理一些IO操作呢?这个问题其实在过去早有答案,linux的poll
、epoll
,windows的iocp
,macos的kqueue
实现都打好了样板。我们可以参考linux的poll
写一段这样的代码
trait Future {
type Output;
fn poll(&mut self) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
fn await_call<T>(mut async_block: impl Future<Output = T>) -> T {
loop {
match async_block.poll() {
Poll::Ready(res) => return res,
Poll::Pending => continue
}
}
}
Future
要求实现一个poll
函数,当异步任务执行完成时返回Poll::Ready
,否则返回Poll::Pending
。上层的await_call
则不断poll
这个函数,直至它返回Poll::Ready
。
这个模型很基础,但存在一个显而易见的问题:和其蓝本一样,在死循环里不断调一个函数显然太耗能了。为此我们可以要求poll
的声明里加入一个回调函数,让Future
在底层数据准备好的时候通过回调函数发回一个信号,唤醒await_call
再去调用Future::poll
。
在这个模型的基础上可以进一步封装Join
和Then
:
struct Join<A, B, U, V>
where
A: Future<Output = U>,
B: Future<Output = V>
{
a: Option<A>,
b: Option<B>,
output_a: MaybeUninit<U>,
output_b: MaybeUninit<V>,
}
impl<A, B, U, V> Future for Join<A, B, U, V>
where
A: Future<Output = U>,
B: Future<Output = V>
{
type Output = (U, V);
fn poll(&mut self, callback: fn()) -> Poll<Self::Output> {
if let Some(fut) = &mut self.a {
if let Poll::Ready(u) = fut.poll(wake) {
self.a.take();
self.output_a.write(u);
}
}
if let Some(fut) = &mut self.b {
if let Poll::Ready(v) = fut.poll(wake) {
self.a.take();
self.output_b.write(v);
}
}
if self.a.is_none() && self.b.is_none() {
// SAFETY: output has been initialized when the associated future was take.
let output = unsafe {
(self.output_a.assume_init(), self.output_b.assume_init())
};
Poll::Ready(output)
} else {
Poll::Pending
}
}
}
struct Then<A, B, U, V>
where
A: Future<Output = U>,
B: Future<Output = V>
{
a: Option<A>,
b: B,
output_a: MaybeUninit<U>,
}
impl<A, B, U, V> Future for Join<A, B, U, V>
where
A: Future<Output = U>,
B: Future<Output = V>
{
type Output = (U, V);
fn poll(&mut self, callback: fn()) -> Poll<Self::Output> {
if let Some(fut) = &mut self.a {
if let Poll::Ready(u) = fut.poll(wake) {
self.a.take();
self.output_a.write(u);
} else {
return Poll::Pending;
}
}
match self.b.poll(wake) {
Poll::Ready(output_b) => {
let output = unsafe {
(self.output_a.assume_init(), output_b)
};
Poll::Ready(output)
}
Poll::Pending => Poll::Pending
}
}
}
有了Join
和Then
就足以表示各种Future
之间的控制流关系了。事实上,rust中的所有async
块和函数都会被隐式地转化成一个实现了Future
的类型。而await
其实是规定了这些Future
之间的控制流关系。
Executor和Runtime
上面调用Future
的await_call
被称为Executor
,实际生产中的Executor
基本原理是一致的,只是更为复杂。
这样简单的模型有一个后果,在async rust甫一问世的时候引起了一些人的疑惑,那就是这样的代码
let async_1 = async {
println!("1");
};
println!("2");
futures::executor::block_on(async_1);
永远会先打印2
再打印1
,与大家经常接触的原生异步的javascript等语言行为不一致。其原因很明显,async_1
其实只是一个实现了Future
的匿名类型,要到block_on
里才会被真正poll
一下。
现在我们来考虑一下Executor
该怎么实现。首先,Executor
当然应该具有独立运行的能力,并且能接收别的代码传过来的Future
,rust中常用的mpsc
队列可以帮助我们实现这两点:
struct Task {
future: Mutex<BoxFuture<'static, ()>>,
sender: Sender<Arc<Task>>
}
struct Spawner {
sender: Sender<Arc<Task>>
}
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
let future = future.boxed();
let task = Task {
future,
sender: self.sender.clone()
};
self.sender.send(Arc::new(task));
}
}
struct Executor {
queue: Receiver<Arc<Task>>
}
impl Executor {
fn run(&self) {
while let Some(task) = self.queue.recv() {
let wake_task = Arc::clone(&task);
let callback = move || wake_task.sender.send(Arc::clone(&wake_task));
let mut fut = task.future.lock().unwrap();
future.poll(callback)
}
}
}
Spawner
通过队列将异步任务装箱发送给Executor
,由于这可能涉及跨线程传输,我们要求异步任务也是Send
和'static
的(就像tokio要求的一样)。Executor
则等待自己的接收队列,收到新的task
就poll
一下其中的future
。注意Executor
的队列中不仅有Spawner
发送过来的,也可能有callback
发送过来的,提醒Executor
“现在这个Future
需要poll
一下了”。 为了让这个Task
能被传来传去,我们不得不给它套个Arc
,相应地Future
也必须套上Mutex
。
这样一改造rust的异步就不再像上一节末尾那样看上去那么生硬了,只要在async_1
那里把async
块spawn
一下,我们就可以实现println!("1")
和println!("2")
真正的异步执行。
在实际生产代码中,Executor
可能有很多个,管理这些Executor
的就是Runtime
,也就是所谓的“运行时”。tokio
这类库正是给出了一套Runtime
和相应的Executor
,才能让rust具备完整的异步能力。
由于我们这一套体系没有脱离线程的运行机制,如果Future
被同步阻塞了,自然也会阻塞整个Executor
,从而牵连属于同一个Executor
的其它任务。为此tokio
才提供了一套异步IO接口和Mutex
这类同步原语。设想某个持有锁的task A陷入await
,调度器切换到另一个task B,task B又去尝试获取锁,当然无法获取,于是它就一直把整个线程卡在这里……不过由于tokio
的异步Mutex
开销过于昂贵,很多时候我们更倾向于使用标准库的Mutex
,除非MutexGuard
需要跨越await
。
Pin和Unpin和!Unpin
到此为止,我们知道了rust中异步代码是如何被执行的,那么请一个问题:有可能把一个引用塞进Future
吗?
在前面的实现中,我们可以看到在主流Runtime中(潜台词是,我们可以选择别的或者自己实现一个Runtime规避这后述问题),Future
是可能被在线程间传来传去的:Spawner
和Executor
可能在两个线程上,Executor
甚至也可能被运行时在线程间搬来搬去。所以只传一个引用是非常危险的,很多时候我们都会需要async move
把变量整体移动进去。然而,这可能引发一个很严重的问题:
async {
let mut x = vec![0; 64];
read(&mut x).await;
println!("{:?}", x);
}
这会被变成类似以下的代码
struct AsyncBlock<'a> {
x: Vec<u8>,
async_read: AsyncRead<'a>
}
struct AsyncRead<'a> {
buf: &'a mut [u8]
}
两个类型都实现了Future
,而且产生了rust最不想人写出的代码:一个含有自引用的结构体。这类代码的问题是,如果AsyncBlock
中的x
被移走了,AsyncRead
中的引用仍然会指向原来的位置,此时解引用会非常危险:
// async_block: AsyncBlock
let theft_x = std::mem::take(&mut async_block.x);
// 现在theft_x已经接管了async_block.async_read.buf所引用的内存
drop(theft_x);
// Boom!
let buf = &*async_block.async_read.buf;
为了解决这个问题,rust引入了Pin
,这是一种包装在指针上的类型,它内部包裹的其实是一个指针,指向某个类型。Pin
的思路是限制对被指向的值的使用方式:
// value: Foo
// 获取value的所有权,得到一个Pin<&mut Foo>
let pinned = pin!(value)
// 然后我们再也无法从pinned拿到&mut Foo
// 如果Foo可以被deref成别的类型,假设叫Bar
// 使用as_mut得到Pin<&mut Bar>
let pinned_bar = pinned.as_mut()
首先,获取了value
的所有权,就表示此时不可能再存在别的&mut value
,随后我们限制在Pin<&mut Foo>
上的操作,令其无法返回&mut Foo
,就杜绝了通过replace
等方法移走Foo
的可能性,当然,因为Pin
一直持有所有权,也不可能把所有权从其内部移走。
由于无法从Pin<&mut T>
获取&mut T
,Future::poll
的签名也要相应修改,要求传入的是Pin<&mut self>
,这就是实际的签名版本。Pin<Ptr<T>>
可以deref
到&T
,但是不可能deref_mut
到&mut T
。
熟悉C++一千种segmentfault的会问,如果拿到了&AsyncBlock
然后拷贝一份会怎么样?答案是生命周期和所有权机制来保证:所有权机制保证含&mut T
的无法被拷贝,生命周期保证AsyncBlock<'_>
和其拷贝必须同年同月同日死,这样内部的不可变引用就不会出问题。
当然Pin<&mut T>
本身也是含生命周期标注的,如果我们需要把变量作为返回值,或者在堆上传来传去,就需要利用Box::into_pin
把它钉在堆上。
IMPORTANT
drop
函数的签名仍然是获取&mut self
而不是Pin<&mut Self>
。所以在此类类型的drop
函数里需要特别注意,相当于编译器自动进行了一次Pin::get_unchecked_mut
,在safe rust中这不会有什么问题,但是当执行unsafe
操作时要时刻记得这本来可能是个被Pin
住的类型。
但是,我们可以自然地想见,如果有某种方法从&T
获得&mut !Unpin
,那Pin
就危险了。事实上这是一个已知的unsoundness,标准库中为了解决此类问题已经禁止了给&T
类型实现DerefMut
,而且专门添加一个unsafe trait PinCoerceUnsized
。然而问题还没有结束,至今#85099依然可以在stable rust中构造出这种错误。
另外有一点可能会反直觉的地方是,Pin
本身是可以移动的,因为它只是个指针的包裹器,不可移动的是Pin
所指向的对象。
Pin
针对的只是一小部分自引用类型,大多数类型其实并不需要Pin
的各限制,为此诞生了Unpin
,和Pin
不同,这是一个trait。Pin<T: Unpin>
提供了更多的方法,允许用户获取其可变引用甚至是所有权,也就是说,对这些类型Pin
其实并没有做任何限制。相反的就是那些被标记了!Unpin
的类型,主要是各种async
代码生成的Future
,用户也可以通过给字段加PhantomPinned
来为自定义结构加上这个属性,对!Unpin
的类型Pin
就会像上面所说的那样工作。
那么为什么会需要Unpin
呢?这其实是绕了一个大弯,因为async
块返回的Future
可能是个!Unpin
类型,于是我们把默认生成的Future
全视为!Unpin
,然后把Future::poll
的签名改了。但如果用户自定义了一个不需要被Pin
住的Future
又会如何?此时用户仍然需要给poll
传一个被Pin
包裹的类型,但显然此时再对其加上和!Unpin
类型一样的限制是没有必要的,于是专门开了个Unpin
的后门。