2726 字
14 分钟
Async Rust —— 自底向上方法
2025-03-15

Rust加入原生异步支持的时间相当晚(当然比C++标准委员会动作快多了),所以rust中的异步编程看上去并不像javascript那么自然,除了几乎必然要接触到tokio这类第三方库外,语言本身像是UnpinFuture这类概念也是拦路虎。这里我们从异步尚不存在的rust出发,看看这栋满是补丁的异步破房子是怎么被修建起来的。

Poll和Future: 最简单的原语#

设想我们现在还只有同步代码,要怎么至少“看上去异步”地处理一些IO操作呢?这个问题其实在过去早有答案,linux的pollepoll,windows的iocp,macos的kqueue实现都打好了样板。我们可以参考linux的poll写一段这样的代码

trait Future {
    type Output;
    fn poll(&mut self) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}


fn await_call<T>(mut async_block: impl Future<Output = T>) -> T {
    loop {
        match async_block.poll() {
            Poll::Ready(res) => return res,
            Poll::Pending => continue
        }
    }
}

Future要求实现一个poll函数,当异步任务执行完成时返回Poll::Ready,否则返回Poll::Pending。上层的await_call则不断poll这个函数,直至它返回Poll::Ready

这个模型很基础,但存在一个显而易见的问题:和其蓝本一样,在死循环里不断调一个函数显然太耗能了。为此我们可以要求poll的声明里加入一个回调函数,让Future在底层数据准备好的时候通过回调函数发回一个信号,唤醒await_call再去调用Future::poll

在这个模型的基础上可以进一步封装JoinThen:

struct Join<A, B, U, V> 
where 
    A: Future<Output = U>,
    B: Future<Output = V>
{
    a: Option<A>,
    b: Option<B>,
    output_a: MaybeUninit<U>,
    output_b: MaybeUninit<V>,
}

impl<A, B, U, V> Future for Join<A, B, U, V> 
where 
    A: Future<Output = U>,
    B: Future<Output = V>
{
    type Output = (U, V);
    fn poll(&mut self, callback: fn()) -> Poll<Self::Output> {
        if let Some(fut) = &mut self.a {
            if let Poll::Ready(u) = fut.poll(wake) {
                self.a.take();
                self.output_a.write(u);
            }
        }

        if let Some(fut) = &mut self.b {
            if let Poll::Ready(v) = fut.poll(wake) {
                self.a.take();
                self.output_b.write(v);
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // SAFETY: output has been initialized when the associated future was take.
            let output = unsafe {
                (self.output_a.assume_init(), self.output_b.assume_init())
            };
            Poll::Ready(output)
        } else {
            Poll::Pending
        }
    }
}

struct Then<A, B, U, V> 
where 
    A: Future<Output = U>,
    B: Future<Output = V>
{
    a: Option<A>,
    b: B,
    output_a: MaybeUninit<U>,
}

impl<A, B, U, V> Future for Join<A, B, U, V> 
where 
    A: Future<Output = U>,
    B: Future<Output = V>
{
    type Output = (U, V);
    fn poll(&mut self, callback: fn()) -> Poll<Self::Output> {
        if let Some(fut) = &mut self.a {
            if let Poll::Ready(u) = fut.poll(wake) {
                self.a.take();
                self.output_a.write(u);
            } else {
                return Poll::Pending;
            }
        }

        match self.b.poll(wake) {
            Poll::Ready(output_b) => {
                let output = unsafe {
                    (self.output_a.assume_init(), output_b)
                };
                Poll::Ready(output)
            }
            Poll::Pending => Poll::Pending
        }
    }
}

有了JoinThen就足以表示各种Future之间的控制流关系了。事实上,rust中的所有async块和函数都会被隐式地转化成一个实现了Future的类型。而await其实是规定了这些Future之间的控制流关系。

Executor和Runtime#

上面调用Futureawait_call被称为Executor,实际生产中的Executor基本原理是一致的,只是更为复杂。

这样简单的模型有一个后果,在async rust甫一问世的时候引起了一些人的疑惑,那就是这样的代码

let async_1 = async {
    println!("1");
};
println!("2");
futures::executor::block_on(async_1);

永远会先打印2再打印1,与大家经常接触的原生异步的javascript等语言行为不一致。其原因很明显,async_1其实只是一个实现了Future的匿名类型,要到block_on里才会被真正poll一下。

现在我们来考虑一下Executor该怎么实现。首先,Executor当然应该具有独立运行的能力,并且能接收别的代码传过来的Future,rust中常用的mpsc队列可以帮助我们实现这两点:

struct Task {
    future: Mutex<BoxFuture<'static, ()>>,
    sender: Sender<Arc<Task>>
}

struct Spawner {
    sender: Sender<Arc<Task>>
}

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
        let future = future.boxed();
        let task = Task {
            future,
            sender: self.sender.clone()
        };
        self.sender.send(Arc::new(task));
    }
}

struct Executor {
    queue: Receiver<Arc<Task>>
}

impl Executor {
    fn run(&self) {
        while let Some(task) = self.queue.recv() {
            let wake_task = Arc::clone(&task);
            let callback = move || wake_task.sender.send(Arc::clone(&wake_task));
            let mut fut = task.future.lock().unwrap();
            future.poll(callback)
        }
    }
}

Spawner通过队列将异步任务装箱发送给Executor,由于这可能涉及跨线程传输,我们要求异步任务也是Send'static的(就像tokio要求的一样)。Executor则等待自己的接收队列,收到新的taskpoll一下其中的future。注意Executor的队列中不仅有Spawner发送过来的,也可能有callback发送过来的,提醒Executor“现在这个Future需要poll一下了”。 为了让这个Task能被传来传去,我们不得不给它套个Arc,相应地Future也必须套上Mutex

这样一改造rust的异步就不再像上一节末尾那样看上去那么生硬了,只要在async_1那里把asyncspawn一下,我们就可以实现println!("1")println!("2")真正的异步执行。

在实际生产代码中,Executor可能有很多个,管理这些Executor的就是Runtime,也就是所谓的“运行时”。tokio这类库正是给出了一套Runtime和相应的Executor,才能让rust具备完整的异步能力。

由于我们这一套体系没有脱离线程的运行机制,如果Future被同步阻塞了,自然也会阻塞整个Executor,从而牵连属于同一个Executor的其它任务。为此tokio才提供了一套异步IO接口和Mutex这类同步原语。设想某个持有锁的task A陷入await,调度器切换到另一个task B,task B又去尝试获取锁,当然无法获取,于是它就一直把整个线程卡在这里……不过由于tokio的异步Mutex开销过于昂贵,很多时候我们更倾向于使用标准库的Mutex,除非MutexGuard需要跨越await

Pin和Unpin和!Unpin#

到此为止,我们知道了rust中异步代码是如何被执行的,那么请一个问题:有可能把一个引用塞进Future吗?

在前面的实现中,我们可以看到在主流Runtime中(潜台词是,我们可以选择别的或者自己实现一个Runtime规避这后述问题),Future是可能被在线程间传来传去的:SpawnerExecutor可能在两个线程上,Executor甚至也可能被运行时在线程间搬来搬去。所以只传一个引用是非常危险的,很多时候我们都会需要async move把变量整体移动进去。然而,这可能引发一个很严重的问题:

async {
    let mut x = vec![0; 64];
    read(&mut x).await;
    println!("{:?}", x);
}

这会被变成类似以下的代码

struct AsyncBlock<'a> {
    x: Vec<u8>,
    async_read: AsyncRead<'a>
}

struct AsyncRead<'a> {
    buf: &'a mut [u8]
}

两个类型都实现了Future,而且产生了rust最不想人写出的代码:一个含有自引用的结构体。这类代码的问题是,如果AsyncBlock中的x被移走了,AsyncRead中的引用仍然会指向原来的位置,此时解引用会非常危险:

// async_block: AsyncBlock
let theft_x = std::mem::take(&mut async_block.x);
// 现在theft_x已经接管了async_block.async_read.buf所引用的内存
drop(theft_x);
// Boom!
let buf = &*async_block.async_read.buf;

为了解决这个问题,rust引入了Pin,这是一种包装在指针上的类型,它内部包裹的其实是一个指针,指向某个类型。Pin的思路是限制对被指向的值的使用方式:

// value: Foo
// 获取value的所有权,得到一个Pin<&mut Foo>
let pinned = pin!(value)
// 然后我们再也无法从pinned拿到&mut Foo
// 如果Foo可以被deref成别的类型,假设叫Bar
// 使用as_mut得到Pin<&mut Bar>
let pinned_bar = pinned.as_mut()

首先,获取了value的所有权,就表示此时不可能再存在别的&mut value,随后我们限制在Pin<&mut Foo>上的操作,令其无法返回&mut Foo,就杜绝了通过replace等方法移走Foo的可能性,当然,因为Pin一直持有所有权,也不可能把所有权从其内部移走。

由于无法从Pin<&mut T>获取&mut T,Future::poll的签名也要相应修改,要求传入的是Pin<&mut self>,这就是实际的签名版本。Pin<Ptr<T>>可以deref&T,但是不可能deref_mut&mut T

熟悉C++一千种segmentfault的会问,如果拿到了&AsyncBlock然后拷贝一份会怎么样?答案是生命周期和所有权机制来保证:所有权机制保证含&mut T的无法被拷贝,生命周期保证AsyncBlock<'_>和其拷贝必须同年同月同日死,这样内部的不可变引用就不会出问题。

当然Pin<&mut T>本身也是含生命周期标注的,如果我们需要把变量作为返回值,或者在堆上传来传去,就需要利用Box::into_pin把它钉在堆上。

IMPORTANT

drop函数的签名仍然是获取&mut self而不是Pin<&mut Self>。所以在此类类型的drop函数里需要特别注意,相当于编译器自动进行了一次Pin::get_unchecked_mut,在safe rust中这不会有什么问题,但是当执行unsafe操作时要时刻记得这本来可能是个被Pin住的类型。

但是,我们可以自然地想见,如果有某种方法从&T获得&mut !Unpin,那Pin就危险了。事实上这是一个已知的unsoundness,标准库中为了解决此类问题已经禁止了给&T类型实现DerefMut,而且专门添加一个unsafe trait PinCoerceUnsized。然而问题还没有结束,至今#85099依然可以在stable rust中构造出这种错误。

另外有一点可能会反直觉的地方是,Pin本身是可以移动的,因为它只是个指针的包裹器,不可移动的是Pin所指向的对象。

Pin针对的只是一小部分自引用类型,大多数类型其实并不需要Pin的各限制,为此诞生了Unpin,和Pin不同,这是一个trait。Pin<T: Unpin>提供了更多的方法,允许用户获取其可变引用甚至是所有权,也就是说,对这些类型Pin其实并没有做任何限制。相反的就是那些被标记了!Unpin的类型,主要是各种async代码生成的Future,用户也可以通过给字段加PhantomPinned来为自定义结构加上这个属性,对!Unpin的类型Pin就会像上面所说的那样工作。

那么为什么会需要Unpin呢?这其实是绕了一个大弯,因为async块返回的Future可能是个!Unpin类型,于是我们把默认生成的Future全视为!Unpin,然后把Future::poll的签名改了。但如果用户自定义了一个不需要被Pin住的Future又会如何?此时用户仍然需要给poll传一个被Pin包裹的类型,但显然此时再对其加上和!Unpin类型一样的限制是没有必要的,于是专门开了个Unpin的后门。

Async Rust —— 自底向上方法
https://blog.lambdaris.page/posts/async_rust/
作者
Lambdaris
发布于
2025-03-15
许可协议
CC BY-SA 4.0