👈 13 智能指针
使用线程同时运行代码
使用 spawn
创建新线程
Rust 标准库使用 1:1 线程实现,即一个语言级线程对应一个系统线程。
调用 thread::spawn
并传递一个闭包以创建新线程:
thread::spawn(|| {
// ...
});
使用 join
等待所有线程结束
thread::spawn
返回一个 JoinHandle
,对其调用 join()
可以强制其等待线程结束,确保线程在主线程退出前完成执行:
fn main() {
let handle = thread::spawn(|| {
// ...
});
handle.join().unwrap();
}
实际上,调用 handle.join()
将阻塞当前线程直到 handle
线程结束(因此 join()
调用的位置很重要)。
将 move
闭包与线程一同使用
尝试在子线程中打印在主线程中定义的 vector:
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| {
println!("{:?}", v);
});
handle.join().unwrap();
}
无法过编:
error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
--> src\main.rs:6:32
|
6 | let handle = thread::spawn(|| {
| ^^ may outlive borrowed value `v`
7 | println!("{:?}", v);
| - `v` is borrowed here
|
note: function requires argument type to outlive `'static`
--> src\main.rs:6:18
|
6 | let handle = thread::spawn(|| {
| __________________^
7 | | println!("{:?}", v);
8 | | });
| |______^
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
|
6 | let handle = thread::spawn(move || {
| ++++
错误信息表明闭包可能比当前函数活得更久,但它捕获了当前函数的变量 v
,也就是说,在线程使用 v
时其可能已经失效。比如,有可能在下文调用 drop(v)
:
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| {
println!("{:?}", v);
});
drop(v); // v 失效
handle.join().unwrap();
}
编译器建议使用 move
强制闭包获取 v
的所有权:
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("{:?}", v);
});
handle.join().unwrap();
}
使用消息传递在线程间传送数据
消息传递(message passing) 是一个确保安全并发的方式,线程或 actor 通过发送包含数据的消息进行沟通。
Rust 标准库提供了一个 信道(channel) 实现,这是一个通用编程概念,表示数据从一个线程发送到另一个。信道由 发送者(transmitter) 和 接收者(receiver) 组成。
可以将信道想象成一条河流,在上游放入一个橡皮鸭(发送),它会随水流到达下游(接收)。
调用 mpsc::channel
创建一个信道:
let (tx, rx) = mpsc::channel();
mpsc
是 多生产者,单消费者(multiple producer, single consumer) 的缩写,tx
、rx
分别是发送者、接收者的惯用缩写。
让 tx
在子线程中发送消息,rx
在主线程中接收该消息:
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
Got: hi
信道与所有权转移
尝试在 tx
发送完 val
后再使用 val
:
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("{val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
无法过编:
error[E0382]: borrow of moved value: `val`
--> src\main.rs:9:19
|
7 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
8 | tx.send(val).unwrap();
| --- value moved here
9 | println!("{val}");
| ^^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
help: consider cloning the value if the performance cost is acceptable
|
8 | tx.send(val.clone()).unwrap();
| ++++++++
当线程将值发送出去后,其他线程对值的修改可能会由于不一致而导致错误或意外的结果,因此 send()
被设计为获取参数的所有权,以防在发送后再意外地使用这个值。
发送多个值并观察接收者的等待
现在,子线程有一个 Vec<String>
要发送到主线程,单独发送每个字符串并暂停 1
秒;在主线程中,不再显式调用 recv()
,而是将 rx
作为一个迭代器使用,当信道关闭时迭代器也结束:
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals: Vec<String> =
["hi", "from", "the", "thread"]
.iter()
.map(|x| x.to_string())
.collect();
for val in vals {
thread::sleep(Duration::from_secs(1));
tx.send(val).unwrap();
}
});
for received in rx {
println!("Got: {received}");
}
}
观察输出,每隔 1
秒才会打印出新的一行:
Got: hi
Got: from
Got: the
Got: thread
这直观地演示了信道的作用:主线程中没有任何暂停或等待的代码,之所以出现 1
秒的间隔,就是因为主线程在等待从子线程中接收新的消息。
通过克隆发送者来创建多个生产者
既然 mpsc
的意思是有多个生产者,可以通过克隆 tx
获得一个新的发送者,在两个子线程中分别向同一个接收者发送消息:
fn main() {
let (tx, rx) = mpsc::channel();
let tx_cloned = tx.clone(); // 克隆
thread::spawn(move || {
let vals: Vec<String> =
["hi", "from", "the", "thread"]
.iter()
.map(|x| x.to_string())
.collect();
for val in vals {
thread::sleep(Duration::from_secs(1));
tx.send(val).unwrap();
}
});
thread::spawn(move || { // 新增的子线程
let vals: Vec<String> =
["more", "messages", "for", "you"]
.iter()
.map(|x| x.to_string())
.collect();
for val in vals {
thread::sleep(Duration::from_secs(1));
tx_cloned.send(val).unwrap();
}
});
for received in rx {
println!("Got: {received}");
}
}
Got: hi
Got: more
Got: messages
Got: from
Got: for
Got: the
Got: thread
Got: you
共享状态并发
除了消息传递,多个线程还可以拥有相同的共享数据进行并发。
互斥器(mutex) 是 mutual exclusion 的缩写,意味着任意时刻只允许一个线程访问某些数据。互斥器通过 锁(lock) 系统 保护(guarding) 其数据。
互斥器难以使用的原因在于必须牢记其规则:
- 在访问数据前要先获取锁。
- 处理完数据后要释放锁,以便其他线程获取锁。
Rust 中,类型系统和所有权规则保证了程序不会在获取锁或解锁上出错。
互斥器一次只允许一个线程访问数据
在单线程上下文中使用 Mutex<T>
,在子块中获取锁并修改其值,然后在外部打印出修改后的值:
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}", m);
m = Mutex { data: 6, poisoned: false, .. }
调用 lock()
将尝试获取锁,它将阻塞当前线程直到其拥有锁。如果锁正被另一个线程占用,此时后者 panic,则 lock()
调用将失败且没有线程能够获取这个锁。
获取锁之后,就可以将返回值(此为 num
)视为一个其内部数据的可变引用。
Mutex<T>
是一个智能指针,确切地说,lock()
返回一个智能指针 MutexGuard
,它实现了 Deref
trait 指向其内部数据,也实现了 Drop
trait 以在离开作用域时自动释放锁,确保锁被释放。
Mutex<T>
的 API
在线程间共享 Mutex<T>
尝试启动 10 个线程,在每个线程中对同一个计数器 +1
,猜测计数器将从 0
加到 10
:
let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
无法过编:
error[E0382]: use of moved value: `counter`
--> src\main.rs:8:36
|
5 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
8 | let handle = thread::spawn(move || {
| ^^^^^^^ value moved into closure here, in previous iteration of loop
9 | let mut num = counter.lock().unwrap();
| ------- use occurs due to use in closure
错误信息表明,在先前的循环中,counter
的所有权被移入闭包,因而不能在后续的循环中再次使用。
多线程和多所有权
尝试用多所有权智能指针 Rc<T>
修复上文代码,实现多线程上下文中的线程共享 Mutex<T>
:
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
无法过编:
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
--> src\main.rs:10:36
|
10 | let handle = thread::spawn(move || {
| ------------- ^------
| | |
| ______________________|_____________within this `{closure@src\main.rs:10:36: 10:43}`
| | |
| | required by a bound introduced by this call
11 | | let mut num = counter.lock().unwrap();
12 | | *num += 1;
13 | | });
| |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
|
= help: within `{closure@src\main.rs:10:36: 10:43}`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
note: required because it's used within this closure
--> src\main.rs:10:36
|
10 | let handle = thread::spawn(move || {
| ^^^^^^^
note: required by a bound in `spawn`
--> C:\Users\spygl\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:681:8
|
678 | pub fn spawn<F, T>(f: F) -> JoinHandle<T>
| ----- required by a bound in this function
...
681 | F: Send + 'static,
| ^^^^ required by this bound in `spawn`
错误信息表明,Rc<Mutex<i32>> cannot be sent between threads safely
,因为它没有实现 Send
trait,后者是一个确保类型可以安全用于并发的 trait。
Rc<T>
加减计数的方式是 clone()
的调用和克隆的丢弃,这个过程不是原子性的,因而无法安全用于并发。
原子引用计数 Arc<T>
Arc<T>
是一个类似 Rc<T>
、但可以安全用于并发的类型,Arc
是 原子引用计数(atomically reference counted) 的缩写。
Arc<T>
和 Rc<T>
的 API 相同,直接修改上文代码:
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
Result: 10
运行的结果正如一开始的期望。
RefCell<T>
/Rc<T>
与 Mutex<T>
/Arc<T>
的相似性
正如 RefCell<T>
可以改变 Rc<T>
的内容,也可以用 Mutex<T>
改变 Arc<T>
的内容。
Rust 不能避免 Mutex<T>
的全部逻辑错误,正如两个 Rc<T>
相互引用造成的内存泄漏,Mutex<T>
也有造成死锁的风险。
使用 Sync
和 Send
trait 的可扩展并发
并发几乎全部包含在标准库中,而非 Rust 本身。但 std::marker
中的 Sync
和 Send
trait 是语言本身特性。
通过 Send
允许在线程间转移所有权
实现了 Send
trait 的类型,其值的所有权可以在线程间传送。几乎所有类型都实现了 Send
,但如 Rc<T>
等并不。
Sync
允许多线程访问
实现了 Sync
trait 的类型,其可以安全地在多个线程中拥有其值的引用。基本类型都实现了 Sync
,由基本类型构成的类型也是。
对类型 T
,如果 &T
实现了 Send
,则 T
实现了 Sync
。
手动实现 Send
和 Sync
是不安全的
👉 15 面向对象