您现在的位置是:首页 >技术杂谈 >rust多线程网站首页技术杂谈
rust多线程
rust多线程
在rust中,多线程编程不算困难,但是也需要留心和别的编程语言中不同的地方。rust的标准库中提供的thread库来帮助我们进行多线程编程。在使用的时候需要使用use std::thread
来引入thread库即可。
创建线程
使用thread::spawn()
即可创建一个新的线程。spawn的原型如下所示:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
它会返回一个JoinHandle对象,该对象提供了join,允许调用者等待创建的线程执行完成。join方法返回thread::Result
,那么如果线程执行成功,则返回由Ok包裹的值,否则返回Err值。例如:
use std::thread;
fn main() {
let th = thread::spawn(|| println!("Hello thread!")); // 创建新线程
let res = th.join(); // 等待线程执行完毕
match res {
Ok(v) => println!("Success:{:?}", v),
Err(e) => println!("Error:{:?}", e),
}
}
这段代码非常简单,创建了一个线程,让它执行一个闭包,闭包会输出Hello thread!,然后使用join等待线程执行完毕,最后在match中输出线程执行结果。程序执行结果如下所示:
Hello thread!
Success:()
现在,我们让闭包干一点实际的事情,例如计算斐波那契数列的第n项并返回。
use std::thread;
fn main() {
let index = 10; // 斐波那契数列第N项
let th = thread::spawn(move ||{ let mut a = 1;
let mut b = 1;
let mut c = 0;
for i in 0..index-2 {
if i == 1 && i == 2{
c = 1;
break;
}else {
(c, a, b) = (a + b, b, c)
}
}
c
});
let res = th.join();
match res {
Ok(v) => println!("Success:{v}"),
Err(e) => println!("Error:{:?}", e),
}
}
通过join等待线程执行结束,获取到线程的执行结果,然后打印输出。需要注意,我们通过move将index的所有权转移到了闭包中,这是因为该闭包是一个新的线程。rust的所有权机制要求我们将index的所有权转移到新线程中,这样就避免了条件竞争。当我们将index的值改为100,那么闭包中计算的时候将会超出i32的范围,导致panic,此时我们的Error将会打印错误Error:Any { .. }
线程的结束
main 线程是程序的主线程,一旦结束,则程序随之结束,同时各个子线程也将被强行终止。如果父线程不是 main 线程,当父线程终止的时候,子线程如果还在运行,那么它不会受到影响,继续运行。例如:
use std::thread;
use std::time::Duration;
fn main() {
// 创建一个线程A
let new_thread = thread::spawn(move || {
// 再创建一个线程B
thread::spawn(move || {
loop {
println!("I am a new thread.");
}
})
});
// 等待新创建的线程执行完成
new_thread.join().unwrap();
println!("Child thread is finish!");
// 睡眠一段时间,看子线程创建的子线程是否还在运行
thread::sleep(Duration::from_millis(100));
}
以上代码中,main 线程创建了一个新的线程 A,同时该新线程又创建了一个新的线程 B,可以看到 A 线程在创建完 B 线程后就立即结束了,而 B 线程则在不停地循环输出。
线程屏障
屏障允许每个线程等待,直到所有的合作线程都到达某一点,然后从该点继续执行。有时候是有用的。可以参考线程屏障(barrier)中的多线程排序代码,这是POSIX中的线程,而rust的屏障使用如下所示:
use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
let mut handles = Vec::with_capacity(6);
let barrier = Arc::new(Barrier::new(6)); // 创建屏障,阻塞n-1个线程,也就是5个,然后在第6个线程调用wait()时立即唤醒所有线程
for _ in 0..6 {
let b = barrier.clone();
handles.push(thread::spawn(move|| { // 创建新线程
println!("before wait");
b.wait(); // 阻塞当前线程,直到所有线程都在这里会合。在所有线程会合一次后,屏障可以重复使用,并且可以连续使用。
println!("after wait");
}));
}
for handle in handles {
handle.join().unwrap();
}
}
程序执行结果如下所示:
before wait
before wait
before wait
before wait
before wait
before wait
after wait
after wait
after wait
after wait
after wait
after wait
线程局部变量(Thread Local Variable)
线程局部变量可以保证数据的线程安全性,每个线程都有自己独立的变量副本,互不干扰,不需要使用锁或其他同步机制来保护数据。使用 thread_local 宏可以初始化线程局部变量,然后在线程内部使用该变量的 with 方法获取变量值,例如:
#![allow(unused)]
fn main() {
use std::cell::RefCell;
use std::thread;
thread_local!(static FOO: RefCell<u32> = RefCell::new(1));
FOO.with(|f| {
assert_eq!(*f.borrow(), 1);
*f.borrow_mut() = 2;
});
// 每个线程开始时都会拿到线程局部变量的FOO的初始值
let t = thread::spawn(move|| {
FOO.with(|f| {
assert_eq!(*f.borrow(), 1);
*f.borrow_mut() = 3;
});
});
// 等待线程完成
t.join().unwrap();
// 尽管子线程中修改为了3,我们在这里依然拥有main线程中的局部值:2
FOO.with(|f| {
assert_eq!(*f.borrow(), 2);
});
}
上面代码中,FOO 即是我们创建的线程局部变量,每个新的线程访问它时,都会使用它的初始值作为开始,各个线程中的 FOO 值彼此互不干扰。注意 FOO 使用 static 声明为生命周期为 'static 的静态变量。
第三方库 thread-local
除了标准库外,还可以使用 thread-local 库,它允许每个线程持有值的独立拷贝:
use thread_local::ThreadLocal;
use std::sync::Arc;
use std::cell::Cell;
use std::thread;
let tls = Arc::new(ThreadLocal::new());
// 创建多个线程
for _ in 0..5 {
let tls2 = tls.clone();
thread::spawn(move || {
// 将计数器加1
let cell = tls2.get_or(|| Cell::new(0));
cell.set(cell.get() + 1);
}).join().unwrap();
}
// 一旦所有子线程结束,收集它们的线程局部变量中的计数器值,然后进行求和
let tls = Arc::try_unwrap(tls).unwrap();
let total = tls.into_iter().fold(0, |x, y| x + y.get());
// 和为5
assert_eq!(total, 5);
该库不仅仅使用了值的拷贝,而且还能自动把多个拷贝汇总到一个迭代器中,最后进行求和,非常好用。
只被调用一次的函数
有时,我们会需要某个函数在多线程环境下只被调用一次,例如初始化全局变量,无论是哪个线程先调用函数来初始化,都会保证全局变量只会被初始化一次,随后的其它线程调用就会忽略该函数:
use std::thread;
use std::sync::Once;
static mut VAL: usize = 0;
static INIT: Once = Once::new();
fn main() {
let handle1 = thread::spawn(move || {
INIT.call_once(|| {
unsafe {
VAL = 1;
}
});
});
let handle2 = thread::spawn(move || {
INIT.call_once(|| {
unsafe {
VAL = 2;
}
});
});
handle1.join().unwrap();
handle2.join().unwrap();
println!("{}", unsafe { VAL });
}
代码运行的结果取决于哪个线程先调用 INIT.call_once (虽然代码具有先后顺序,但是线程的初始化顺序并无法被保证!因为线程初始化是异步的,且耗时较久),若 handle1 先,则输出 1,否则输出 2。
注意代码中的unsafe,因此这存在着数据竞争的可能,因此需要使用unsafe中操作。
call_once 方法
- 执行初始化过程一次,并且只执行一次。
- 如果当前有另一个初始化过程正在运行,线程将阻止该方法被调用。
- 当这个函数返回时,保证一些初始化已经运行并完成,它还保证由执行的闭包所执行的任何内存写入都能被其他线程在这时可靠地观察到。
消息通道
与 Go 语言内置的chan不同,Rust 是在标准库里提供了消息通道(channel),但是,在实际使用中,我们需要使用不同的库来满足诸如:多发送者 -> 单接收者,多发送者 -> 多接收者等场景形式。
单发送者,单接受者
标准库提供了通道std::sync::mpsc,其中mpsc是multiple producer, single consumer的缩写,代表了该通道支持多个发送者,但是只支持唯一的接收者。 当然,支持多个发送者也意味着支持单个发送者,我们先来看看单发送者、单接收者的简单例子:
use std::sync::mpsc;
use std::thread;
fn main() {
// 创建一个消息通道, 返回一个元组:(发送者,接收者)
let (tx, rx) = mpsc::channel();
// 创建线程,并发送消息
thread::spawn(move || {
// 发送一个数字1, send方法返回Result<T,E>,通过unwrap进行快速错误处理
tx.send(1).unwrap();
// 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误
// tx.send(Some(1)).unwrap()
});
// 在主线程中接收子线程发送的消息并输出
println!("receive {}", rx.recv().unwrap());
}
- tx,rx对应发送者和接收者,它们的类型由编译器自动推导: tx.send(1)发送了整数,因此它们分别是
mpsc::Sender<i32>
和mpsc::Receiver<i32>
类型,需要注意,由于内部是泛型实现,一旦类型被推导确定,该通道就只能传递对应类型的值。 - 接收消息的操作rx.recv()会阻塞当前线程,直到读取到值,或者通道被关闭
- 需要使用move将tx的所有权转移到子线程的闭包中
在注释中提到send方法返回一个Result<T,E>
,说明它有可能返回一个错误,例如接收者被drop导致了发送的值不会被任何人接收,此时继续发送毫无意义,因此返回一个错误最为合适,在代码中我们仅仅使用unwrap进行了快速处理,但在实际项目中你需要对错误进行进一步的处理。
同样的,对于recv方法来说,当发送者关闭时,它也会接收到一个错误,用于说明不会再有任何值被发送过来。
不阻塞的 try_recv 方法
除了上述recv方法,还可以使用try_recv尝试接收一次消息,该方法并不会阻塞线程,当通道中没有消息时,它会立刻返回一个错误:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send(1).unwrap();
});
println!("receive {:?}", rx.try_recv());
}
由于子线程的创建需要时间,因此println!和try_recv方法会先执行,而此时子线程的消息还未被发出。try_recv会尝试立即读取一次消息,因为消息没有发出,此次读取最终会报错,且主线程运行结束。
我们可以尝试循环输出
for _ in 0..10 {
println!("receive {:?}", rx.try_recv());
}
然后多运行几次程序。你会得到下面的结果。
......
receive Err(Empty)
receive Ok(1)
receive Err(Disconnected)
......
这就是当子线程创建成功且发送消息后,主线程会接收到Ok(1)的消息内容,紧接着子线程结束,发送者也随着被drop,此时接收者又会报错,但是这次错误原因有所不同:Disconnected代表发送者已经被关闭。
使用通道传输数据同样要遵守rust的所有权机制
使用通道来传输数据,一样要遵循 Rust 的所有权规则:
- 若值的类型实现了Copy特征,则直接复制一份该值,然后传输过去,例如之前的i32类型
- 若值没有实现Copy,则它的所有权会被转移给接收端,在发送端继续使用该值将报错
使用 for 进行循环接收
下面来看看如何连续接收通道中的值:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
在上面代码中,主线程和子线程是并发运行的,子线程在不停的发送消息 -> 休眠 1 秒,与此同时,主线程使用for循环阻塞的从rx迭代器中接收消息,当子线程运行完成时,发送者tx会随之被drop,此时for循环将被终止,最终main线程成功结束。
多发送者和单接受者
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone(); // 需要clone一份tx,一个子线程拿走tx的所有权,另一个子线程拿走tx1的所有权
// 发送者
thread::spawn(move || {
tx.send(String::from("hi from raw tx")).unwrap();
});
// 发送者
thread::spawn(move || {
tx1.send(String::from("hi from cloned tx")).unwrap();
});
// 接受者
for received in rx {
println!("Got: {}", received);
}
}
- 需要所有的发送者都被drop掉后,接收者rx才会收到错误,进而跳出for循环,最终结束主线程
- 这里虽然用了clone但是并不会影响性能,因为它并不在热点代码路径中,仅仅会被执行一次
- 由于两个子线程谁先创建完成是未知的,因此哪条消息先发送也是未知的,最终主线程的输出顺序也不确定。
- 对于通道而言,消息的发送顺序和接收顺序是一致的,满足FIFO原则(先进先出)。
同步通道和异步通道
-
异步通道
之前我们使用的都是异步通道:无论接收者是否正在接收消息,消息发送者在发送消息时都不会阻塞。
-
同步通道
与异步通道相反,同步通道发送消息是阻塞的,只有在消息被接收后才解除阻塞。例如:
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx)= mpsc::sync_channel(0); // 同步通道 let handle = thread::spawn(move || { println!("发送之前"); tx.send(1).unwrap(); println!("发送之后"); }); println!("睡眠之前"); thread::sleep(Duration::from_secs(3)); println!("睡眠之后"); println!("receive {}", rx.recv().unwrap()); handle.join().unwrap(); }
主线程由于睡眠被阻塞导致无法接收消息,因此子线程的发送也一直被阻塞,直到主线程结束睡眠并成功接收消息后,发送才成功:发送之后的输出是在receive 1之后,说明只有接收消息彻底成功后,发送消息才算完成。
我们在创建同步通道的时候,使用了sync_channel,并传递了参数0,这意味着该通道中无法缓存消息。如果我们传递1作为参数,那么通道可以缓存1个消息。当你设定为N时,发送者就可以无阻塞的往通道中发送N条消息,当消息缓冲队列满了后,新的消息发送将被阻塞(如果没有接收者消费缓冲队列中的消息,那么第N+1条消息就将触发发送阻塞)。
异步通道缓冲上限取决于你的内存大小。因此,使用异步消息虽然能非常高效且不会造成发送线程的阻塞,但是存在消息未及时消费,最终内存过大的问题。在实际项目中,可以考虑使用一个带缓冲值的同步通道来避免这种风险。
关闭通道
所有发送者被drop或者所有接收者被drop后,通道会自动关闭。因此需要注意下面的代码:
use std::sync::mpsc;
fn main() {
use std::thread;
let (send, recv) = mpsc::channel();
let num_threads = 3;
for i in 0..num_threads {
let thread_send = send.clone();
thread::spawn(move || {
thread_send.send(i).unwrap();
println!("thread {:?} finished", i);
});
}
for x in recv {
println!("Got: {}", x);
}
println!("finished iterating");
}
这段代码看起来没有问题。但是一旦运行它,就会发现,程序不会结束。而这正是因为通道没有正确关闭。通道的关闭条件是当发送方全部被drop或者接收方全部被drop。这个例子中正是由于发送方send没有被drop,而导致通道没有关闭,而陷入了阻塞状态。我们可以drop掉send,让程序正常结束。
use std::sync::mpsc;
fn main() {
use std::thread;
let (send, recv) = mpsc::channel();
let num_threads = 3;
for i in 0..num_threads {
let thread_send = send.clone();
thread::spawn(move || {
thread_send.send(i).unwrap();
println!("thread {:?} finished", i);
});
}
drop(send); // drop掉send,这样所有的发送端都被drop掉了,通道会关闭。
sleep(Duration::from_secs(1));
for x in recv { // 关闭以后的通道,接收端可以读取数据,而发送端不能再发送数据。
println!("Got: {}", x);
}
println!("finished iterating");
}
在主线程中,我们特意sleep了1秒,让发送端先发送数据,然后drop所有的发送端。此时通道关闭。关闭以后的通道,接收端可以读取数据。下面,再来看一个接收方被drop掉,通道关闭的示例。
use std::{sync::mpsc, thread::sleep, time::Duration};
fn main() {
use std::thread;
let (send, recv) = mpsc::channel();
let num_threads = 3;
for i in 0..num_threads {
let thread_send = send.clone();
thread::spawn(move || {
thread_send.send(i).unwrap();
println!("thread {:?} finished", i);
});
}
sleep(Duration::from_secs(1));
drop(recv);
send.send(1).unwrap();
}
sleep的目的是让我们for循环中创建的三个线程能够先执行,往通道中写数据。然后在主线程中,我们直接drop掉接受方,此时通道已关闭。我们尝试向通道中写入数据,就会发生错误。通道关闭的时候,发送端无法发送数据。
单发送者和多接受者
这个问题比较好转化。实际上就是一个发送线程,多个接收线程。那么有多少个接收线程,创建多少个通道即可。例如:
use std::{sync::mpsc::{self, Sender, Receiver}, thread};
fn sender(sends: [Sender<i32>; 3]) {
for i in 0..=3 {
for send in &sends {
send.send(i).unwrap();
}
}
}
fn receiver1(recv: Receiver<i32>) {
for re in recv {
println!("receive1 {re}, do something 1");
}
}
fn receiver2(recv: Receiver<i32>) {
for re in recv {
println!("receive2 {re}, do something 2");
}
}
fn receiver3(recv: Receiver<i32>) {
for re in recv {
println!("receive3 {re}, do something 3");
}
}
#[allow(unused_must_use)]
fn main() {
// 创建3组通道
let (send1, recv1) = mpsc::channel();
let (send2, recv2) = mpsc::channel();
let (send3, recv3) = mpsc::channel();
let sends = [send1, send2, send3];
// 单发送者,向多个接受者发送消息
sender(sends);
// 多接收者
let hd1 = thread::spawn(move || receiver1(recv1));
let hd2 = thread::spawn(move || receiver2(recv2));
let hd3 = thread::spawn(move || receiver3(recv3));
// 等待接收者线程执行完毕
hd1.join();
hd2.join();
hd3.join();
}
多发送者和多接收者
在单发送者,多接收者的基础上,增加多个发送者即可构造出多发送者,多接收者的模式。一般这种情形可能比较少见。
use std::{sync::mpsc::{self, Sender, Receiver}, thread};
fn sender(sends: [Sender<i32>; 3]) {
for i in 0..=3 {
for send in &sends {
send.send(i).unwrap();
}
}
}
fn sender2(sends: [Sender<i32>; 3]){
for i in 0..=3 {
for send in &sends {
send.send(i+3).unwrap();
}
}
}
fn receiver1(recv: Receiver<i32>) {
for re in recv {
println!("receive1 {re}, do something 1");
}
}
fn receiver2(recv: Receiver<i32>) {
for re in recv {
println!("receive2 {re}, do something 2");
}
}
fn receiver3(recv: Receiver<i32>) {
for re in recv {
println!("receive3 {re}, do something 3");
}
}
#[allow(unused_must_use)]
fn main() {
// 创建3组通道
let (send1, recv1) = mpsc::channel();
let (send2, recv2) = mpsc::channel();
let (send3, recv3) = mpsc::channel();
let sends = [send1, send2, send3];
let sends2 = sends.clone(); // 复制发送者
// 多发送者,向多个接收者发送消息
sender(sends);
sender2(sends2);
// 多接收者,接收从多个发送者来的消息
let hd1 = thread::spawn(move || receiver1(recv1));
let hd2 = thread::spawn(move || receiver2(recv2));
let hd3 = thread::spawn(move || receiver3(recv3));
// 等待接收者线程执行完毕
hd1.join();
hd2.join();
hd3.join();
}
第三方库
- crossbeam-channel, 老牌强库,功能较全,性能较强,之前是独立的库,但是后面合并到了crossbeam主仓库中
- flume, 官方给出的性能数据某些场景要比 crossbeam 更好些
线程同步
rust不仅提供了消息传递方式来实现线程同步,而且也提供了共享内存的方式来实现同步。我们可以通过锁和原子操作来实现多线程同步。
该如何选择
共享内存可以说是同步的灵魂,因为消息传递的底层实际上也是通过共享内存来实现,两者的区别如下:
- 共享内存相对消息传递能节省多次内存拷贝的成本
- 共享内存的实现简洁的多
- 共享内存的锁竞争更多
消息传递适用的场景很多,我们下面列出了几个主要的使用场景:
- 需要可靠和简单的(简单不等于简洁)实现时
- 需要模拟现实世界,例如用消息去通知某个目标执行相应的操作时
- 需要一个任务处理流水线(管道)时,等等
而使用共享内存(并发原语)的场景往往就比较简单粗暴:需要简洁的实现以及更高的性能时。
互斥锁 Mutex
既然是共享内存,那并发原语自然是重中之重,先来一起看看皇冠上的明珠: 互斥锁Mutex(mutual exclusion 的缩写)。
Mutex让多个线程并发的访问同一个值变成了排队访问:同一时间,只允许一个线程A访问该值,其它线程需要等待A访问完成后才能继续。例如:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
Mutex是一个智能指针。这里使用Mutex::new(0)创建了一个互斥锁实例,0是该互斥锁中的数据。当我们需要访问互斥锁中的数据时,需要使用lock()方法来获取锁,该方法会阻塞当前线程,直到获取到锁。获取到锁以后,才能对数据进行修改。互斥锁保证了多个线程同时访问该数据时,只有一个线程能获取到锁,其它线程只能阻塞着等待,这样就保证了数据能被安全的修改!
m.lock()方法也有可能报错,例如当前正在持有锁的线程panic了。在这种情况下,其它线程不可能再获得锁,因此lock方法会返回一个错误。
由于Mutex是一个智能指针,通常而言,无需释放锁,只需要把控好锁的作用域即可,例如上面的代码中num在线程执行完毕就会被释放,因此无需手动释放锁。当然也可以手动释放锁,例如:
use std::sync::Mutex;
fn main() {
let name = Mutex::new("张三".to_string());
// 模拟某个函数对name进行了修改
let mut s = name.lock().unwrap();
s.clear();
s.push_str("李四");
drop(s); // 手动drop掉锁
let mut s = name.lock().unwrap();
println!("{}", s); // 输出s
s.clear();
s.push_str("王五");
println!("{}", s); // 输出name
}
这只是个演示,没有太大的实际意义。在这个示例中Mutex<T>
可以支持修改内部数据,当结合Arc<T>
一起使用时,可以实现多线程的内部可变性。
使用Mutex时需要注意的问题
-
Mutex是互斥锁,因此在使用数据前必须先获取锁;在数据使用完成后,必须及时的释放锁。(建议手动drop,或者加上{}来限制锁的范围,不然可能并发能力会受到严重影响)
-
另外一个问题就是“死锁”。这个问题可能很难避免。下面是个单线程中死锁的例子。
use std::sync::Mutex; fn main() { let data = Mutex::new(0); let d1 = data.lock(); let d2 = data.lock(); } // d1锁在此处释放
只要你在另一个锁还未被释放时去申请新的锁,就会触发,当代码复杂后,这种情况可能就没有那么显眼。
多线程死锁,当我们拥有两个锁,且两个线程各自使用了其中一个锁,然后试图去访问另一个锁时,就可能发生死锁。关于死锁更多的信息,可以参考这篇文章。
下面是一个例子。
use std::{thread::{self, sleep}, sync::{Mutex, Arc}, time::Duration}; fn doing(i: i32, printer: Arc<Mutex<&str>>, scanner: Arc<Mutex<&str>>) { println!("我是{i}线程"); let _a = printer.lock().unwrap(); println!("{i}正在打印东西..."); sleep(Duration::from_secs(3)); let _b = scanner.lock().unwrap(); println!("{i}正在扫描文件..."); sleep(Duration::from_secs(3)); println!("{i}线程结束"); } fn doing2(i: i32, printer: Arc<Mutex<&str>>, scanner: Arc<Mutex<&str>>) { println!("我是{i}线程"); let _b = scanner.lock().unwrap(); println!("{i}正在扫描文件..."); sleep(Duration::from_secs(3)); let _a = printer.lock().unwrap(); println!("{i}正在打印东西..."); sleep(Duration::from_secs(3)); println!("{i}线程结束"); } fn main() { let printer = Arc::new(Mutex::new("打印机")); let scanner = Arc::new(Mutex::new("扫描仪")); let mut handles = vec![]; for i in 0..2 { let p = Arc::clone(&printer); let s = Arc::clone(&scanner); if i == 1 { let hd = thread::spawn(move || doing(i, p, s)); handles.push(hd); }else { let hd = thread::spawn(move || doing2(i, p, s)); handles.push(hd); } } for hd in handles { let _ = hd.join(); } }
在这个例子中,我们没有及时的释放资源导致了死锁的发生。doing拿不到扫描仪的锁,而doing2拿不到打印机的锁。我们可以在使用完资源以后立即释放锁。
use std::{thread::{self, sleep}, sync::{Mutex, Arc}, time::Duration}; fn doing(i: i32, printer: Arc<Mutex<&str>>, scanner: Arc<Mutex<&str>>) { println!("我是{i}线程"); let _a = printer.lock().unwrap(); println!("{i}正在打印东西..."); sleep(Duration::from_secs(3)); drop(_a); // 立即释放锁 let _b = scanner.lock().unwrap(); println!("{i}正在扫描文件..."); sleep(Duration::from_secs(3)); println!("{i}线程结束"); } fn doing2(i: i32, printer: Arc<Mutex<&str>>, scanner: Arc<Mutex<&str>>) { println!("我是{i}线程"); let _b = scanner.lock().unwrap(); println!("{i}正在扫描文件..."); drop(_b); // 立即释放锁 sleep(Duration::from_secs(3)); let _a = printer.lock().unwrap(); println!("{i}正在打印东西..."); sleep(Duration::from_secs(3)); println!("{i}线程结束"); }
try_lock
与lock方法不同,try_lock会尝试去获取一次锁,如果无法获取会返回一个错误,因此不会发生阻塞。我们将上面必定发生死锁的代码中的lock改为try_lock。
use std::{thread::{self, sleep}, sync::{Mutex, Arc}, time::Duration};
fn doing(i: i32, printer: Arc<Mutex<&str>>, scanner: Arc<Mutex<&str>>) {
println!("我是{i}线程");
let _a = printer.try_lock().unwrap();
println!("{i}正在打印东西...");
sleep(Duration::from_secs(3));
drop(_a); // 立即释放锁
let _b = scanner.try_lock().unwrap();
println!("{i}正在扫描文件...");
sleep(Duration::from_secs(3));
println!("{i}线程结束");
}
fn doing2(i: i32, printer: Arc<Mutex<&str>>, scanner: Arc<Mutex<&str>>) {
println!("我是{i}线程");
let _b = scanner.try_lock().unwrap();
println!("{i}正在扫描文件...");
drop(_b); // 立即释放锁
sleep(Duration::from_secs(3));
let _a = printer.try_lock().unwrap();
println!("{i}正在打印东西...");
sleep(Duration::from_secs(3));
println!("{i}线程结束");
}
fn main() {
let printer = Arc::new(Mutex::new("打印机"));
let scanner = Arc::new(Mutex::new("扫描仪"));
let mut handles = vec![];
for i in 0..2 {
let p = Arc::clone(&printer);
let s = Arc::clone(&scanner);
if i == 1 {
let hd = thread::spawn(move || doing(i, p, s));
handles.push(hd);
}else {
let hd = thread::spawn(move || doing2(i, p, s));
handles.push(hd);
}
}
for hd in handles {
let _ = hd.join();
}
}
当try_lock失败时,会报出一个错误thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "WouldBlock"'
,接着线程中的剩余代码会继续执行,不会被阻塞。
读写锁 RwLock
Mutex会对每次读写都进行加锁,但某些时候,我们需要大量的并发读,Mutex就无法满足需求了,此时就可以使用RwLock:
use std::sync::RwLock;
fn main() {
let lock = RwLock::new(5);
// 同一时间允许多个读
{
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
assert_eq!(*r1, 5);
assert_eq!(*r2, 5);
} // 读锁在此处被drop
// 同一时间只允许一个写
{
let mut w = lock.write().unwrap();
*w += 1;
assert_eq!(*w, 6);
// 以下代码会panic,因为读和写不允许同时存在
// 写锁w直到该语句块结束才被释放,因此下面的读锁依然处于`w`的作用域中
// let r1 = lock.read();
// println!("{:?}",r1);
}// 写锁在此处被drop
}
RwLock在使用上和Mutex区别不大,需要注意的是,当读写同时发生时,程序会直接panic。读锁是共享的,因为它允许多个线程同时读取数据,但写锁是独占的,因为只有一个线程可以写入数据。并且写的时候不允许读。Rust中的Rwlock是基于操作系统提供的原语实现的,性能未必比Mutex好。关于性能,benchmark 永远是你在迷茫时最好的朋友!
条件变量
Mutex用于解决资源安全访问的问题,但是我们还需要一个手段来解决资源访问顺序的问题。而 Rust 考虑到了这一点,为我们提供了条件变量(Condition Variables),它经常和Mutex一起使用,可以让线程挂起,直到某个条件发生后再继续执行。条件变量一般用于以下场景。
- 共享队列:多个线程需要读取或写入一个共享队列时,可以使用条件变量来通知读取线程在队列非空时进行读取,通知写入线程在队列未满时进行写入。
- 生产者-消费者模型:多个生产者线程和消费者线程需要共享一个缓冲区时,可以使用条件变量来通知消费者线程在缓冲区非空时进行读取,通知生产者线程在缓冲区未满时进行写入。
- 线程池:多个任务需要在一个线程池中执行时,可以使用条件变量来通知空闲线程在有新任务到达时进行执行。
- 计算任务:多个线程需要协作完成一个大型计算任务时,可以使用条件变量来控制每个线程的执行顺序和进度,以避免竞争和死锁。
上述的这些场景也可以通过其它手段来实现,例如,通道实际上就是一个生产者-消费者模型;而计算任务的协同也可以通过线程屏障来解决。线程池一般不需要我们自己实现,而是使用成熟,稳定的第三方库。因此,我们使用条件变量的场景可能没有那么多。关于条件变量的使用可以参考官方文档以及POSIX中条件变量。
信号量 Semaphore
在多线程中,另一个重要的概念就是信号量,使用它可以让我们精准的控制当前正在运行的任务最大数量。本来 Rust 在标准库中有提供一个信号量实现, 但是由于各种原因这个库现在已经不再推荐使用了,因此我们推荐使用tokio中提供的Semaphore实现: tokio::sync::Semaphore。(也可使用带缓冲的通道来实现信号量机制)。以下是tokio中使用信号量的方式
use std::sync::Arc;
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(3));
let mut join_handles = Vec::new();
for _ in 0..5 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
join_handles.push(tokio::spawn(async move {
//
// 在这里执行任务...
//
drop(permit);
}));
}
for handle in join_handles {
handle.await.unwrap();
}
}
上面代码创建了一个容量为 3 的信号量,当正在执行的任务超过 3 时,剩下的任务需要等待正在执行任务完成并减少信号量后到 3 以内时,才能继续执行。这里的关键其实说白了就在于:信号量的申请和归还,使用前需要申请信号量,如果容量满了,就需要等待;使用后需要释放信号量,以便其它等待者可以继续。因此,根据这个思路,你也可以实现自己的信号量。
线程同步:Atomic 原子类型与内存顺序
从 Rust1.34 版本后,就正式支持原子类型。原子指的是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核 CPU 下,当某个 CPU 核心开始运行原子操作时,会先暂停其它 CPU 内核对内存的操作,以保证原子操作不会被其它 CPU 内核所干扰。
由于原子操作是通过指令提供的支持,因此它的性能相比锁和消息传递会好很多。相比较于锁而言,原子类型不需要开发者处理加锁和释放锁的问题,同时支持修改,读取等操作,还具备较高的并发性能,几乎所有的语言都支持原子类型。
std::sync::atomic包中仅提供了数值类型的原子操作:AtomicBool, AtomicIsize, AtomicUsize, AtomicI8, AtomicU16等。如果你需要的不是数值类型,那么就使用锁,而不是原子类型。原子类型通常我们可能在以下场景使用。
- 无锁(lock free)数据结构
- 全局变量,例如全局自增 ID, 在后续章节会介绍
- 跨线程计数器,例如可以用于统计指标
use std::ops::Sub;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread::{self, JoinHandle};
use std::time::Instant;
const N_TIMES: u64 = 10000000;
const N_THREADS: usize = 10;
static R: AtomicU64 = AtomicU64::new(0);
fn add_n_times(n: u64) -> JoinHandle<()> {
thread::spawn(move || {
for _ in 0..n {
R.fetch_add(1, Ordering::Relaxed);
}
})
}
fn main() {
let s = Instant::now();
let mut threads = Vec::with_capacity(N_THREADS);
for _ in 0..N_THREADS {
threads.push(add_n_times(N_TIMES));
}
for thread in threads {
thread.join().unwrap();
}
assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed));
println!("{:?}",Instant::now().sub(s));
}
以上代码启动了数个线程,每个线程都在疯狂对全局变量进行加 1 操作, 最后将它与线程数 * 加1次数进行比较,如果发生了因为多个线程同时修改导致了脏数据,那么这两个必将不相等。好在,它没有让我们失望,不仅快速的完成了任务,而且保证了 100%的并发安全性。还有一点值得注意: 和Mutex一样,Atomic的值具有内部可变性,你无需将其声明为mut。
内存顺序
内存顺序是指 CPU 在访问内存时的顺序,该顺序可能受以下因素的影响:
- 代码中的先后顺序
- 编译器优化导致在编译阶段发生改变(内存重排序 reordering)
- 运行阶段因 CPU 的缓存机制导致顺序被打乱
-
编译器优化导致内存顺序的改变
static mut X: u64 = 0; static mut Y: u64 = 1; fn main() { ... // A unsafe { ... // B X = 1; ... // C Y = 3; ... // D X = 2; ... // E } }
假如在C和D代码片段中,根本没有用到X = 1,那么编译器很可能会将X = 1和X = 2进行合并:
... // A unsafe { ... // B X = 2; ... // C Y = 3; ... // D ... // E }
若代码A中创建了一个新的线程用于读取全局静态变量X,则该线程将无法读取到X = 1的结果,因为在编译阶段就已经被优化掉。
-
CPU 缓存导致的内存顺序的改变
假设之前的X = 1没有被优化掉,并且在代码片段A中有一个新的线程:initial state: X = 0, Y = 1 THREAD Main THREAD A X = 1; if X == 1 { Y = 3; Y *= 2; X = 2; }
我们来讨论下以上线程状态,Y最终的可能值(可能性依次降低):
- Y = 3: 线程Main运行完后才运行线程A,或者线程A运行完后再运行线程Main
- Y = 6: 线程Main的Y = 3运行完,但X = 2还没被运行, 此时线程 A 开始运行Y *= 2, 最后才运行Main线程的X = 2
- Y = 2: 线程Main正在运行Y = 3还没结束,此时线程A正在运行Y *= 2, 因此Y取到了值 1,然后Main的线程将Y设置为 3, 紧接着就被线程A的Y = 2所覆盖
- Y = 2: 上面的还只是一般的数据竞争,这里虽然产生了相同的结果2,但是背后的原理大相径庭: 线程Main运行完Y = 3,但是 CPU 缓存中的Y = 3还没有被同步到其它 CPU 缓存中,此时线程A中的Y *= 2就开始读取Y,结果读到了值1,最终计算出结果2
甚至更改成:
initial state: X = 0, Y = 1 THREAD Main THREAD A X = 1; if X == 2 { Y = 3; Y *= 2; X = 2; }
还是可能出现Y = 2,因为Main线程中的X和Y被同步到其它 CPU 缓存中的顺序未必一致。
限定内存顺序的 5 个规则
在理解了内存顺序可能存在的改变后,你就可以明白为什么 Rust 提供了Ordering::Relaxed用于限定内存顺序了,事实上,该枚举有 5 个成员:- Relaxed, 这是最宽松的规则,它对编译器和 CPU 不做任何限制,可以乱序
- Release 释放,设定内存屏障(Memory barrier),保证它之前的操作永远在它之前,但是它后面的操作可能被重排到它前面
- Acquire 获取, 设定内存屏障,保证在它之后的访问永远在它之后,但是它之前的操作却有可能被重排到它后面,往往和Release在不同线程中联合使用
- AcqRel, 是 Acquire 和 Release 的结合,同时拥有它们俩提供的保证。比如你要对一个 atomic 自增 1,同时希望该操作之前和之后的读取或写入操作不会被重新排序
- SeqCst 顺序一致性, SeqCst就像是AcqRel的加强版,它不管原子操作是属于读取还是写入的操作,只要某个线程有用到SeqCst的原子操作,线程中该SeqCst操作前的数据操作绝对不会被重新排在该SeqCst操作之后,且该SeqCst操作后的数据操作也绝对不会被重新排在SeqCst操作前。
这些规则由于是系统提供的,因此其它语言提供的相应规则也大同小异,大家如果不明白可以看看其它语言的相关解释。
内存顺序的选择
不知道怎么选择时,优先使用SeqCst,虽然会稍微减慢速度,但是慢一点也比出现错误好
多线程只计数fetch_add而不使用该值触发其他逻辑分支的简单使用场景,可以使用Relaxed
多线程中使用 Atomic
在多线程环境中要使用Atomic需要配合Arc:
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{hint, thread};
fn main() {
let spinlock = Arc::new(AtomicUsize::new(1));
let spinlock_clone = Arc::clone(&spinlock);
let thread = thread::spawn(move|| {
spinlock_clone.store(0, Ordering::SeqCst);
});
// 等待其它线程释放锁
while spinlock.load(Ordering::SeqCst) != 0 {
hint::spin_loop();
}
if let Err(panic) = thread.join() {
println!("Thread had an error: {:?}", panic);
}
}
实际上,对于大多数使用者而言,很少会用到Atomic,通常在高性能的库开发的时候才会使用它。这里只是做一个简单的介绍,等使用到的时候在仔细研究如何使用。
基于 Send 和 Sync 的线程安全
Send和Sync是 Rust 安全并发的重中之重,但是实际上它们只是标记特征(marker trait,该特征未定义任何行为,因此非常适合用于标记), 来看看它们的作用:
- 实现Send的类型可以在线程间安全的传递其所有权
- 实现Sync的类型可以在线程间安全的共享(通过引用)
这里还有一个潜在的依赖:一个类型要在线程间安全的共享的前提是,指向它的引用必须能在线程间传递。因为如果引用都不能被传递,我们就无法在多个线程间使用引用去访问同一个数据了。
由上可知,若类型 T 的引用&T是Send,则T是Sync。
unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}
首先RwLock可以在线程间安全的共享,那它肯定是实现了Sync,但是我们的关注点不在这里。众所周知,RwLock可以并发的读,说明其中的值T必定也可以在线程间共享,那T必定要实现Sync。
果不其然,上述代码中,T的特征约束中就有一个Sync特征,那问题又来了,Mutex是不是相反?再来看看:
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
不出所料,Mutex<T>
中的T并没有Sync特征约束。
Arc和Rc
前面说过,Rc无法在线程间安全的转移。而Arc可以在线程之间安全的转移。实际上就是因为Send和Sync特征。
// Rc源码片段
impl<T: ?Sized> !marker::Send for Rc<T> {}
impl<T: ?Sized> !marker::Sync for Rc<T> {}
// Arc源码片段
unsafe impl<T: ?Sized + Sync + Send> Send for Arc<T> {}
unsafe impl<T: ?Sized + Sync + Send> Sync for Arc<T> {}
!代表移除特征的相应实现,上面代码中Rc<T>
的Send和Sync特征被特地移除了实现,而Arc<T>
则相反,实现了Sync + Send.
实现Send和Sync的类型
在 Rust 中,几乎所有类型都默认实现了Send和Sync,而且由于这两个特征都是可自动派生的特征(通过derive派生),意味着一个复合类型(例如结构体), 只要它内部的所有成员都实现了Send或者Sync,那么它就自动实现了Send或Sync。
当然,如果是自定义的复合类型:只要复合类型中有一个成员不是Send或Sync,那么该复合类型也就不是Send或Sync。
为裸指针实现Sync和Send
- 实现Send
为MyBox实现了Send trait之后,它就可以在多线程之间进行所有权的转移了,否则不行。use std::thread; #[derive(Debug)] struct MyBox(*mut u8); unsafe impl Send for MyBox {} fn main() { let p = MyBox(5 as *mut u8); let t = thread::spawn(move || { println!("{:?}",p); }); t.join().unwrap(); }
- 实现Sync
为MyBox实现了Sync之后,就可以配合Arc在多线程中传递MyBox的不可变引用了。use std::thread; use std::sync::Arc; use std::sync::Mutex; #[derive(Debug)] struct MyBox(*const u8); unsafe impl Sync for MyBox {} fn main() { let b = &MyBox(5 as *const u8); let v = Arc::new(Mutex::new(b)); let t = thread::spawn(move || { let _v1 = v.lock().unwrap(); }); t.join().unwrap(); }