使用线程同时运行代码
使用 spawn 创建新线程
Rust 标准库使用 1:1 线程实现,即一个语言级线程对应一个系统线程。
调用 thread::spawn 并传递一个闭包以创建新线程:
thread::spawn(|| {
// ...
});使用 join 等待所有线程结束
thread::spawn 返回一个 JoinHandle,对其调用 join() 可以强制其等待线程结束,确保线程在主线程退出前完成执行:
fn main() {
let handle = thread::spawn(|| {
// ...
});
handle.join().unwrap();
}实际上,调用 handle.join() 将阻塞当前线程直到 handle 线程结束(因此 join() 调用的位置很重要)。
将 move 闭包与线程一同使用
尝试在子线程中打印在主线程中定义的 vector:
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| {
println!("{:?}", v);
});
handle.join().unwrap();
}无法过编:
error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
--> src\main.rs:6:32
|
6 | let handle = thread::spawn(|| {
| ^^ may outlive borrowed value `v`
7 | println!("{:?}", v);
| - `v` is borrowed here
|
note: function requires argument type to outlive `'static`
--> src\main.rs:6:18
|
6 | let handle = thread::spawn(|| {
| __________________^
7 | | println!("{:?}", v);
8 | | });
| |______^
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
|
6 | let handle = thread::spawn(move || {
| ++++错误信息表明闭包可能比当前函数活得更久,但它捕获了当前函数的变量 v,也就是说,在线程使用 v 时其可能已经失效。比如,有可能在下文调用 drop(v):
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| {
println!("{:?}", v);
});
drop(v); // v 失效
handle.join().unwrap();
}编译器建议使用 move 强制闭包获取 v 的所有权:
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keywordfn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("{:?}", v);
});
handle.join().unwrap();
}使用消息传递在线程间传送数据
消息传递(message passing) 是一个确保安全并发的方式,线程或 actor 通过发送包含数据的消息进行沟通。
Rust 标准库提供了一个 信道(channel) 实现,这是一个通用编程概念,表示数据从一个线程发送到另一个。信道由 发送者(transmitter) 和 接收者(receiver) 组成。
可以将信道想象成一条河流,在上游放入一个橡皮鸭(发送),它会随水流到达下游(接收)。
调用 mpsc::channel 创建一个信道:
let (tx, rx) = mpsc::channel();mpsc 是 多生产者,单消费者(multiple producer, single consumer) 的缩写,tx、rx 分别是发送者、接收者的惯用缩写。
让 tx 在子线程中发送消息,rx 在主线程中接收该消息:
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}Got: hi信道与所有权转移
尝试在 tx 发送完 val 后再使用 val:
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("{val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}无法过编:
error[E0382]: borrow of moved value: `val`
--> src\main.rs:9:19
|
7 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
8 | tx.send(val).unwrap();
| --- value moved here
9 | println!("{val}");
| ^^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
help: consider cloning the value if the performance cost is acceptable
|
8 | tx.send(val.clone()).unwrap();
| ++++++++当线程将值发送出去后,其他线程对值的修改可能会由于不一致而导致错误或意外的结果,因此 send() 被设计为获取参数的所有权,以防在发送后再意外地使用这个值。
发送多个值并观察接收者的等待
现在,子线程有一个 Vec<String> 要发送到主线程,单独发送每个字符串并暂停 1 秒;在主线程中,不再显式调用 recv(),而是将 rx 作为一个迭代器使用,当信道关闭时迭代器也结束:
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals: Vec<String> =
["hi", "from", "the", "thread"]
.iter()
.map(|x| x.to_string())
.collect();
for val in vals {
thread::sleep(Duration::from_secs(1));
tx.send(val).unwrap();
}
});
for received in rx {
println!("Got: {received}");
}
}观察输出,每隔 1 秒才会打印出新的一行:
Got: hi
Got: from
Got: the
Got: thread这直观地演示了信道的作用:主线程中没有任何暂停或等待的代码,之所以出现 1 秒的间隔,就是因为主线程在等待从子线程中接收新的消息。
通过克隆发送者来创建多个生产者
既然 mpsc 的意思是有多个生产者,可以通过克隆 tx 获得一个新的发送者,在两个子线程中分别向同一个接收者发送消息:
fn main() {
let (tx, rx) = mpsc::channel();
let tx_cloned = tx.clone(); // 克隆
thread::spawn(move || {
let vals: Vec<String> =
["hi", "from", "the", "thread"]
.iter()
.map(|x| x.to_string())
.collect();
for val in vals {
thread::sleep(Duration::from_secs(1));
tx.send(val).unwrap();
}
});
thread::spawn(move || { // 新增的子线程
let vals: Vec<String> =
["more", "messages", "for", "you"]
.iter()
.map(|x| x.to_string())
.collect();
for val in vals {
thread::sleep(Duration::from_secs(1));
tx_cloned.send(val).unwrap();
}
});
for received in rx {
println!("Got: {received}");
}
}Got: hi
Got: more
Got: messages
Got: from
Got: for
Got: the
Got: thread
Got: you共享状态并发
除了消息传递,多个线程还可以拥有相同的共享数据进行并发。
互斥器(mutex) 是 mutual exclusion 的缩写,意味着任意时刻只允许一个线程访问某些数据。互斥器通过 锁(lock) 系统 保护(guarding) 其数据。
互斥器难以使用的原因在于必须牢记其规则:
- 在访问数据前要先获取锁。
- 处理完数据后要释放锁,以便其他线程获取锁。
Rust 中,类型系统和所有权规则保证了程序不会在获取锁或解锁上出错。
互斥器一次只允许一个线程访问数据
在单线程上下文中使用 Mutex<T>,在子块中获取锁并修改其值,然后在外部打印出修改后的值:
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}", m);m = Mutex { data: 6, poisoned: false, .. }调用 lock() 将尝试获取锁,它将阻塞当前线程直到其拥有锁。如果锁正被另一个线程占用,此时后者 panic,则 lock() 调用将失败且没有线程能够获取这个锁。
获取锁之后,就可以将返回值(此为 num)视为一个其内部数据的可变引用。
Mutex<T> 是一个智能指针,确切地说,lock() 返回一个智能指针 MutexGuard,它实现了 Deref trait 指向其内部数据,也实现了 Drop trait 以在离开作用域时自动释放锁,确保锁被释放。
Mutex<T> 的 API
在线程间共享 Mutex<T>
尝试启动 10 个线程,在每个线程中对同一个计数器 +1,猜测计数器将从 0 加到 10:
let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());无法过编:
error[E0382]: use of moved value: `counter`
--> src\main.rs:8:36
|
5 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
8 | let handle = thread::spawn(move || {
| ^^^^^^^ value moved into closure here, in previous iteration of loop
9 | let mut num = counter.lock().unwrap();
| ------- use occurs due to use in closure错误信息表明,在先前的循环中,counter 的所有权被移入闭包,因而不能在后续的循环中再次使用。
多线程和多所有权
尝试用多所有权智能指针 Rc<T> 修复上文代码,实现多线程上下文中的线程共享 Mutex<T>:
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());无法过编:
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
--> src\main.rs:10:36
|
10 | let handle = thread::spawn(move || {
| ------------- ^------
| | |
| ______________________|_____________within this `{closure@src\main.rs:10:36: 10:43}`
| | |
| | required by a bound introduced by this call
11 | | let mut num = counter.lock().unwrap();
12 | | *num += 1;
13 | | });
| |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
|
= help: within `{closure@src\main.rs:10:36: 10:43}`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
note: required because it's used within this closure
--> src\main.rs:10:36
|
10 | let handle = thread::spawn(move || {
| ^^^^^^^
note: required by a bound in `spawn`
--> C:\Users\spygl\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:681:8
|
678 | pub fn spawn<F, T>(f: F) -> JoinHandle<T>
| ----- required by a bound in this function
...
681 | F: Send + 'static,
| ^^^^ required by this bound in `spawn`错误信息表明,Rc<Mutex<i32>> cannot be sent between threads safely,因为它没有实现 Send trait,后者是一个确保类型可以安全用于并发的 trait。
Rc<T> 加减计数的方式是 clone() 的调用和克隆的丢弃,这个过程不是原子性的,因而无法安全用于并发。
原子引用计数 Arc<T>
Arc<T> 是一个类似 Rc<T>、但可以安全用于并发的类型,Arc 是 原子引用计数(atomically reference counted) 的缩写。
Arc<T> 和 Rc<T> 的 API 相同,直接修改上文代码:
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());Result: 10运行的结果正如一开始的期望。
RefCell<T>/Rc<T> 与 Mutex<T>/Arc<T> 的相似性
正如 RefCell<T> 可以改变 Rc<T> 的内容,也可以用 Mutex<T> 改变 Arc<T> 的内容。
Rust 不能避免 Mutex<T> 的全部逻辑错误,正如两个 Rc<T> 相互引用造成的内存泄漏,Mutex<T> 也有造成死锁的风险。
使用 Sync 和 Send trait 的可扩展并发
并发几乎全部包含在标准库中,而非 Rust 本身。但 std::marker 中的 Sync 和 Send trait 是语言本身特性。
通过 Send 允许在线程间转移所有权
实现了 Send trait 的类型,其值的所有权可以在线程间传送。几乎所有类型都实现了 Send,但如 Rc<T> 等并不。
Sync 允许多线程访问
实现了 Sync trait 的类型,其可以安全地在多个线程中拥有其值的引用。基本类型都实现了 Sync,由基本类型构成的类型也是。
对类型 T,如果 &T 实现了 Send,则 T 实现了 Sync。