👈 13 智能指针

使用线程同时运行代码

使用 spawn 创建新线程

Rust 标准库使用 1:1 线程实现,即一个语言级线程对应一个系统线程。

调用 thread::spawn 并传递一个闭包以创建新线程:

thread::spawn(|| {
	// ...
});

使用 join 等待所有线程结束

thread::spawn 返回一个 JoinHandle,对其调用 join() 可以强制其等待线程结束,确保线程在主线程退出前完成执行:

fn main() {
	let handle = thread::spawn(|| {
		// ...
	});
	handle.join().unwrap();
}

实际上,调用 handle.join() 将阻塞当前线程直到 handle 线程结束(因此 join() 调用的位置很重要)。

move 闭包与线程一同使用

尝试在子线程中打印在主线程中定义的 vector:

fn main() {
    let v = vec![1, 2, 3];
    let handle = thread::spawn(|| {
        println!("{:?}", v);
    });
    handle.join().unwrap();
}

无法过编:

error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
 --> src\main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("{:?}", v);
  |                          - `v` is borrowed here
  |
note: function requires argument type to outlive `'static`
 --> src\main.rs:6:18
  |
6 |       let handle = thread::spawn(|| {
  |  __________________^
7 | |         println!("{:?}", v);
8 | |     });
  | |______^
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ++++

错误信息表明闭包可能比当前函数活得更久,但它捕获了当前函数的变量 v,也就是说,在线程使用 v 时其可能已经失效。比如,有可能在下文调用 drop(v)

fn main() {
    let v = vec![1, 2, 3];
    let handle = thread::spawn(|| {
        println!("{:?}", v);
    });
    drop(v); // v 失效
    handle.join().unwrap();
}

编译器建议使用 move 强制闭包获取 v 的所有权:

help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
fn main() {
    let v = vec![1, 2, 3];
    let handle = thread::spawn(move || {
        println!("{:?}", v);
    });
    handle.join().unwrap();
}

使用消息传递在线程间传送数据

消息传递(message passing) 是一个确保安全并发的方式,线程或 actor 通过发送包含数据的消息进行沟通。

Rust 标准库提供了一个 信道(channel) 实现,这是一个通用编程概念,表示数据从一个线程发送到另一个。信道由 发送者(transmitter) 和 接收者(receiver) 组成。

可以将信道想象成一条河流,在上游放入一个橡皮鸭(发送),它会随水流到达下游(接收)。

调用 mpsc::channel 创建一个信道:

let (tx, rx) = mpsc::channel();

mpsc多生产者,单消费者(multiple producer, single consumer) 的缩写,txrx 分别是发送者、接收者的惯用缩写。

tx 在子线程中发送消息,rx 在主线程中接收该消息:

fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}
Got: hi

信道与所有权转移

尝试在 tx 发送完 val 后再使用 val

fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("{val}");
    });
    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

无法过编:

error[E0382]: borrow of moved value: `val`
 --> src\main.rs:9:19
  |
7 |         let val = String::from("hi");
  |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
8 |         tx.send(val).unwrap();
  |                 --- value moved here
9 |         println!("{val}");
  |                   ^^^^^ value borrowed here after move
  |
  = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
help: consider cloning the value if the performance cost is acceptable
  |
8 |         tx.send(val.clone()).unwrap();
  |                    ++++++++

当线程将值发送出去后,其他线程对值的修改可能会由于不一致而导致错误或意外的结果,因此 send() 被设计为获取参数的所有权,以防在发送后再意外地使用这个值。

发送多个值并观察接收者的等待

现在,子线程有一个 Vec<String> 要发送到主线程,单独发送每个字符串并暂停 1 秒;在主线程中,不再显式调用 recv(),而是将 rx 作为一个迭代器使用,当信道关闭时迭代器也结束:

fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let vals: Vec<String> =
            ["hi", "from", "the", "thread"]
                .iter()
                .map(|x| x.to_string())
                .collect();
        for val in vals {
            thread::sleep(Duration::from_secs(1));
            tx.send(val).unwrap();
        }
    });
    for received in rx {
        println!("Got: {received}");
    }
}

观察输出,每隔 1 秒才会打印出新的一行:

Got: hi
Got: from
Got: the
Got: thread

这直观地演示了信道的作用:主线程中没有任何暂停或等待的代码,之所以出现 1 秒的间隔,就是因为主线程在等待从子线程中接收新的消息。

通过克隆发送者来创建多个生产者

既然 mpsc 的意思是有多个生产者,可以通过克隆 tx 获得一个新的发送者,在两个子线程中分别向同一个接收者发送消息:

fn main() {
    let (tx, rx) = mpsc::channel();
    let tx_cloned = tx.clone(); // 克隆
    thread::spawn(move || {
        let vals: Vec<String> =
            ["hi", "from", "the", "thread"]
                .iter()
                .map(|x| x.to_string())
                .collect();
        for val in vals {
            thread::sleep(Duration::from_secs(1));
            tx.send(val).unwrap();
        }
    });
    thread::spawn(move || { // 新增的子线程
        let vals: Vec<String> =
            ["more", "messages", "for", "you"]
                .iter()
                .map(|x| x.to_string())
                .collect();
        for val in vals {
            thread::sleep(Duration::from_secs(1));
            tx_cloned.send(val).unwrap();
        }
    });
    for received in rx {
        println!("Got: {received}");
    }
}
Got: hi
Got: more
Got: messages
Got: from
Got: for
Got: the
Got: thread
Got: you

共享状态并发

除了消息传递,多个线程还可以拥有相同的共享数据进行并发。

互斥器(mutex) 是 mutual exclusion 的缩写,意味着任意时刻只允许一个线程访问某些数据。互斥器通过 (lock) 系统 保护(guarding) 其数据。

互斥器难以使用的原因在于必须牢记其规则:

  • 在访问数据前要先获取锁。
  • 处理完数据后要释放锁,以便其他线程获取锁。

Rust 中,类型系统和所有权规则保证了程序不会在获取锁或解锁上出错。

互斥器一次只允许一个线程访问数据

在单线程上下文中使用 Mutex<T>,在子块中获取锁并修改其值,然后在外部打印出修改后的值:

let m = Mutex::new(5);
{
	let mut num = m.lock().unwrap();
	*num = 6;
}
println!("m = {:?}", m);
m = Mutex { data: 6, poisoned: false, .. }

调用 lock() 将尝试获取锁,它将阻塞当前线程直到其拥有锁。如果锁正被另一个线程占用,此时后者 panic,则 lock() 调用将失败且没有线程能够获取这个锁。

获取锁之后,就可以将返回值(此为 num)视为一个其内部数据的可变引用。

Mutex<T> 是一个智能指针,确切地说,lock() 返回一个智能指针 MutexGuard,它实现了 Deref trait 指向其内部数据,也实现了 Drop trait 以在离开作用域时自动释放锁,确保锁被释放。

Mutex<T> 的 API

在线程间共享 Mutex<T>

尝试启动 10 个线程,在每个线程中对同一个计数器 +1,猜测计数器将从 0 加到 10

let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
	let handle = thread::spawn(move || {
		let mut num = counter.lock().unwrap();
		*num += 1;
	});
	handles.push(handle);
}
for handle in handles {
	handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());

无法过编:

error[E0382]: use of moved value: `counter`
 
 --> src\main.rs:8:36
  |
5 |     let counter = Mutex::new(0);
  |         ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
8 |         let handle = thread::spawn(move || {
  |                                    ^^^^^^^ value moved into closure here, in previous iteration of loop
9 |             let mut num = counter.lock().unwrap();
  |                           ------- use occurs due to use in closure

错误信息表明,在先前的循环中,counter 的所有权被移入闭包,因而不能在后续的循环中再次使用。

多线程和多所有权

尝试用多所有权智能指针 Rc<T> 修复上文代码,实现多线程上下文中的线程共享 Mutex<T>

let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
	let counter = Rc::clone(&counter);
	let handle = thread::spawn(move || {
		let mut num = counter.lock().unwrap();
		*num += 1;
	});
	handles.push(handle);
}
for handle in handles {
	handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());

无法过编:

error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
   --> src\main.rs:10:36
    |
10  |           let handle = thread::spawn(move || {
    |                        ------------- ^------
    |                        |             |
    |  ______________________|_____________within this `{closure@src\main.rs:10:36: 10:43}`
    | |                      |
    | |                      required by a bound introduced by this call
11  | |             let mut num = counter.lock().unwrap();
12  | |             *num += 1;
13  | |         });
    | |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
    |
    = help: within `{closure@src\main.rs:10:36: 10:43}`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
note: required because it's used within this closure
   --> src\main.rs:10:36
    |
10  |         let handle = thread::spawn(move || {
    |                                    ^^^^^^^
note: required by a bound in `spawn`
   --> C:\Users\spygl\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:681:8
    |
678 | pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    |        ----- required by a bound in this function
...
681 |     F: Send + 'static,
    |        ^^^^ required by this bound in `spawn`

错误信息表明,Rc<Mutex<i32>> cannot be sent between threads safely,因为它没有实现 Send trait,后者是一个确保类型可以安全用于并发的 trait。

Rc<T> 加减计数的方式是 clone() 的调用和克隆的丢弃,这个过程不是原子性的,因而无法安全用于并发。

原子引用计数 Arc<T>

Arc<T> 是一个类似 Rc<T>、但可以安全用于并发的类型,Arc原子引用计数(atomically reference counted) 的缩写。

Arc<T>Rc<T> 的 API 相同,直接修改上文代码:

let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
	let counter = Arc::clone(&counter);
	let handle = thread::spawn(move || {
		let mut num = counter.lock().unwrap();
		*num += 1;
	});
	handles.push(handle);
}
for handle in handles {
	handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
Result: 10

运行的结果正如一开始的期望。

RefCell<T>/Rc<T> 与 Mutex<T>/Arc<T> 的相似性

正如 RefCell<T> 可以改变 Rc<T> 的内容,也可以用 Mutex<T> 改变 Arc<T> 的内容。

Rust 不能避免 Mutex<T> 的全部逻辑错误,正如两个 Rc<T> 相互引用造成的内存泄漏,Mutex<T> 也有造成死锁的风险。

使用 Sync 和 Send trait 的可扩展并发

并发几乎全部包含在标准库中,而非 Rust 本身。但 std::marker 中的 SyncSend trait 是语言本身特性。

通过 Send 允许在线程间转移所有权

实现了 Send trait 的类型,其值的所有权可以在线程间传送。几乎所有类型都实现了 Send,但如 Rc<T> 等并不。

Sync 允许多线程访问

实现了 Sync trait 的类型,其可以安全地在多个线程中拥有其值的引用。基本类型都实现了 Sync,由基本类型构成的类型也是。

对类型 T,如果 &T 实现了 Send,则 T 实现了 Sync

手动实现 SendSync 是不安全的

👉 15 面向对象