CSP 异步模型

另外一种能保证安全的的并发模型是CSP模型, 也就是消息传递(message passing). 这里的线程通过发送包含数据的消息来相互沟通. 它来自于Go语言的异步范式:

不要通过共享内存来通讯, 而是通过通讯来共享内存.

Don't communicate by sharing memory, instead, share memory by communicating.

Rust 标准库通过消息通道(channel)来支持CSP模型. 通道由两部分构成, 一个发送者(transmitter), 一个接受者(receiver). 可以通过 transmitter 发送数据, receiver者可以接收到这份数据. 当他们俩任何一个被销毁时, 通道也被销毁(关闭, closed).

mpsc 是多个生产者, 单个消费者( multiple producer, single consumer ), 它创建的通道意味着可以有多个生产者, 但只能有一个接收者.

Try in playground

Rust展开
123456789101112131415161718192021222324252627282930
use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main(){
    let (tx, rx) = mpsc::channel();

    for i in 0..5 {
      let tx = tx.clone();
      thread::spawn(move ||{
        let data = format!("hi, i'm transmiter {i}");
        thread::sleep(Duration::from_millis(1));
        tx.send(data).unwrap();
        // println!("data = {data}");
      });

    }
    

    for data in rx {
      println!("Got: {data}");
    }
}

/**************output****************/
Got: hi, i'm transmiter 4
Got: hi, i'm transmiter 2
Got: hi, i'm transmiter 1
Got: hi, i'm transmiter 0
Got: hi, i'm transmiter 3

对 rx 迭代会调用rx.recv方法, 这个方法会阻塞线程直到从通道中接受到一个值, 并以Result<T, E>的形式返回.当通道被关闭后, 他将接收到一个错误, 表明不会有新的数据了.

使用send发送的数据将会转移所有权, 也就意味着不能在发送后再使用这份数据. 他将会触发一个编译错误.

Rust展开
123456789101112
error[E0382]: borrow of moved value: `data`
  --> src/main.rs:14:27
   |
11 |         let data = format!("hi, i'm transmiter {i}");
   |             ---- move occurs because `data` has type `String`, which does not implement the `Copy` trait
12 |         thread::sleep(Duration::from_millis(1));
13 |         tx.send(data).unwrap();
   |                 ---- value moved here
14 |         println!("data = {data}");
   |                           ^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` (in Nightly builds, run with -Z macro-backtrace for more info)

- roadup -