您现在的位置是:首页 >技术教程 >锁与信号量的深度总结与实现【c++与golang】网站首页技术教程
锁与信号量的深度总结与实现【c++与golang】
前言
线程同步和互斥是多线程编程中的两个重要概念,它们都是为了解决多线程环境下的资源共享和访问问题。
**线程同步:**线程同步是指多个线程按照一定的顺序执行,以确保在访问共享资源时,不会出现数据不一致的问题。同步机制主要包括条件变量和信号量。
**线程互斥:**线程互斥是指在同一时刻,只允许一个线程访问共享资源,以防止数据不一致和竞争条件。互斥锁是实现线程互斥的主要工具。
在操作系统中,锁、条件变量和信号量都是用来实现线程同步与互斥的机制。
锁(Lock)是一种互量,用于保护共享资源,防止多个线程同时访问。当一个线程获得锁时,其他线程必须等待该线程释放锁后才能访问共享资源。常见的锁包括互斥锁和读写锁。
条件变量(Condition Variable)是一种线程间通信的机制,用于等待某个条件的发生。当一个线程等待某个条件时,它会阻塞并释放锁,直到另一个线程发出信号通知该条件已经满足,该线程才会被唤醒并重新获得锁。
信号量(Semaphore)是一种计数器用于控制多个线程对共享资源的访问。当一个线程访问共享资源时它会尝试获取信号量,如果号量的值大于 0,则该线程可以访问共享资源并将信号量的值减 1;如果信号量的值等于 0,则该线程必须等待其他线程释放信号量后才能访问共享资源。
本文分别用c++与golang来使用各种锁,条件变量、信号量来实现线程间的同步和互斥。
一、互斥锁和自旋锁
与单线程编程不同的是,在多线程编程中,多个线程之间可能会同时访问同一个变量、函数或对象等共享资源,因此需要采取措施来避免数据竞争等并发问题。
在 C++ 中,mutex 是一个同步原语,用于协调不同线程的访问和修改。C++11 引入了标准库中的 std::mutex 类,它提供了 lock() 和 unlock() 两个函数来控制互斥锁的状态。
std::mutex 的本质是一个操作系统提供的锁,它分为用户模式锁和内核模式锁。当线程试图去获取锁时,如果锁当前没有被其它线程持有,则该线程将获得这个锁,并且可以继续执行。否则,该线程将进入阻塞状态,直到锁被释放为止。
**在 Go 中,sync.Mutex 是互斥锁的实现。**它有两个方法:Lock 和 Unlock,用于控制互斥锁的状态。sync.Mutex 的底层实现是基于操作系统提供的 futex(fast userspace mutex)机制,它使得线程在获取锁时可以避免进入内核态,并且能够更加高效地进行线程上下文切换。这使得 Go 中的互斥锁在资源竞争较小、并发度较高的情况下有着良好的性能表现。
c++版本
我们首先定义了一个共享变量 num,并在 AddWithOutLock 和 AddWithLock 两个函数中分别对其进行累加操作。其中,AddWithOutLock 是不加锁的版本,而 AddWithLock 是使用互斥锁保护的版本。
在 main 函数中,我们使用 emplace_back 函数向线程数组 threads 中添加子线程对象,并分别传入 AddWithOutLock 和 AddWithLock 函数来创建十个不加锁和加锁的线程。接着,我们使用 join 函数阻塞主线程,等待所有线程执行完毕,并输出累加结果
mutex mtx;
int num = 0;
void AddWithLock() {
for (int i = 0; i < 2000; i++) {
mtx.lock();
num++;
mtx.unlock();
}
}
void AddWithOutLock() {
for (int i = 0; i < 2000; i++) {
num++;
}
}
int main() {
vector<thread> threads; // 创建线程数组
for (int i = 0; i < 10; i++) {
threads.emplace_back(
AddWithOutLock); // 用 emplace_back 函数向数组添加元素
}
for (auto& t : threads) // 遍历线程数组
{
t.join(); // 阻塞当前线程,等待子线程结束
}
cout << "AddWithOutLock: " << num << endl;
// 加锁版测试
num = 0; // 重置 num 值
threads.clear(); // 清空线程数组
for (int i = 0; i < 10; i++) {
threads.emplace_back(AddWithLock);
}
for (auto& t : threads) {
t.join();
}
cout << "AddWithLock: " << num << endl;
return 0;
}
从运行结果可以看出,不加锁的版本出现了数据错误,而加锁的版本则能够正确地累加结果,说明使用互斥锁确实可以有效避免并发问题。
golang版本
在 main 函数中,我们创建了十个 AddWithOutLock 和 AddWithLock 的 goroutine,并且使用 wg.Add 和 wg.Wait 方法控制并发,保证所有 goroutine 都执行完毕后再输出结果。在 Goroutine 中,和C++代码一样,我们使用 sync.Mutex 来实现互斥锁的功能,保证 num 变量的线程安全。
var mtx sync.Mutex
var wg sync.WaitGroup
var num int
func AddWithLock() {
defer wg.Done()
for i:=0;i<2000;i++{
mtx.Lock()
num++
mtx.Unlock()
}
}
func AddWithOutLock() {
defer wg.Done()
for i:=0;i<2000;i++{
num++
}
}
func main() {
wg.Add(10)
for i:=0;i<10;i++{
go AddWithOutLock()
}
wg.Wait()
fmt.Println("AddWithOutLock:",num)
num=0
wg.Add(10)
for i:=0;i<10;i++{
go AddWithLock()
}
wg.Wait()
fmt.Println("AddWithLock:",num)
}
从运行结果可以看出,不加锁的版本出现了数据错误,而加锁的版本则能够正确地累加结果,说明使用互斥锁确实可以有效避免并发问题。
总结:
至此,本文已经介绍了如何使用互斥锁确保多线程对共享变量的安全访问,同时给出了 C++ 和 Go 两种语言的示例代码。虽然语言不同,但是实现的基本原理是相同的。在编写多线程程序时,无论是 C++、Go 还是其他语言,都需要注意多个线程之间的共享资源访问协调,使用适当的同步机制保证线程安全。
二、条件变量
条件变量(Condition Variable)是一种用于实现线程间同步的原语。条件变量允许线程等待某个条件的满足,当条件满足时,其他线程会通知等待的线程。条件变量通常与互斥锁(Mutex)一起使用,以保护共享资源的访问和同步。当一个线程需要等待某个条件成立时,它可以调用条件变量的等待函数将自己阻塞,同时释放持有的互斥锁,当条件成立时,另一个线程可以调用条件变量的通知函数唤醒等待线程。
在 C++ 中,我们可以通过 std::condition_variable 类的 wait() 方法等待条件满足。当条件不满足时,wait() 方法会阻塞当前线程,并释放该条件变量绑定的锁;当条件满足时,wait() 方法会重新获得该锁并返回。
同时,使用 notify_one() 或 notify_all() 方法来通知等待在该条件变量上的线程继续执行。
在golang中,使用sync.Cond可以实现条件变量。func (c Cond) Wait()阻塞等待条件变量满足,释放已掌握的互斥锁相当于cond.L.Unlock()。 注意:两步为一个原子操作。当被唤醒,Wait()函数返回时,解除阻塞并重新获取互斥锁。相当于cond.L.Lock()func (c *Cond) Signal():单发通知,给一个正等待(阻塞)在该条件变量上的goroutine(线程)发送通知【一般都使用这个】
c++版本
互斥量 mtx 用于保护共享数据 product,而条件变量 cond 用于实现线程同步和通信。生产者线程在生产随机数并将其放入缓冲区之前**,首先需要获取互斥量 mtx 的锁,以防止其他线程访问共享数据 product。如果缓冲区已经满了,生产者线程需要调用 cond.wait(lk) 进行等待,此时会自动释放互斥量 mtx 的锁,并阻塞当前线程,等待其他线程发出通知**。当缓冲区有空位时,生产者线程会产生一个随机数并将其放入缓冲区,然后解锁互斥量 mtx 并通知所有等待的线程,即 cond.notify_all()。
消费者线程的实现与生产者线程类似,也需要先获取互斥量 mtx 的锁,并在缓冲区为空时等待其他线程发出通知。当缓冲区中有数据可供消费时,消费者线程会从缓冲区中取出一个随机数进行消费,然后解锁互斥量 mtx 并通知所有等待的线程。
std::queue<int> product;
std::mutex mtx;
std::condition_variable cond;
void producer(int id) {
while (true) {
std::unique_lock<std::mutex> lk(mtx); // 加锁
while (product.size() == 5) { // 缓冲区满了,等待消费
cond.wait(lk);
}
int num = rand() % 1000;
product.push(num);
std::cout << "生产者:" << id << " 生产: " << num << std::endl;
lk.unlock(); // 解锁
cond.notify_all(); // 通知所有等待的线程
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
}
void consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lk(mtx); // 加锁
while (product.empty()) { // 缓冲区为空,等待生产
cond.wait(lk);
}
int num = product.front();
product.pop();
std::cout << "消费者: " << id << " 消费: " << num << std::endl;
lk.unlock(); // 解锁
cond.notify_all(); // 通知所有等待的线程
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
}
int main() {
srand(time(nullptr));
for (int i = 0; i < 5; ++i) {
std::thread t(producer, i + 1);
t.detach();
}
for (int i = 0; i < 5; ++i) {
std::thread t(consumer, i + 1);
t.detach();
}
while (true) {
std::this_thread::yield();
}
return 0;
}
golang版本
当生产者发现缓冲区已经满了时,它会调用 cond.Wait() 方法来释放锁,并等待消费者唤醒它;当消费者发现缓冲区为空时,它也会调用 cond.Wait() 方法来释放锁,并等待生产者唤醒它。当有新的数据被生产或消费时,生产者或消费者会调用 cond.Signal() 或 cond.Broadcast() 方法来唤醒等待的生产者或消费者继续执行。
var cond sync.Cond //定义全局条件变量
func producer(out chan<-int,idx int){
for {
//先加锁
cond.L.Lock()
//判断缓冲区是否满
for len(out) == cap(out){
cond.Wait() //1,zuse 2 shifangsuo 3 huanxingjiasuo
}
num:= rand.Intn(999)
out<- num
fmt.Println("生产者:",idx,"生产:", num)
//访问公共区结束,并且打印结束,解锁
cond.L.Unlock()
//唤醒对端
cond.Signal()
time.Sleep(time.Millisecond*300)
}
}
func consumer(in <-chan int,idx int){
for {
cond.L.Lock()
//判断缓冲区是否为空
for len(in)== 0{
cond.Wait()
}
num := <-in
fmt.Println("消费者:",idx,"消费:",num)
//访问公共区结束后,解锁
cond.L.Unlock() //锁的力度越小越好
cond.Signal()
time.Sleep(time.Millisecond*300)
}
}
func main(){
rand.Seed(time.Now().UnixNano())
product := make(chan int,5)
//指定条件变量 使用的锁
cond.L = new(sync.Mutex)//互斥锁初值为0,是未加锁状态
for i:=0;i<5;i++{
go producer(product,i+1)
}
for i:=0;i<5;i++{
go consumer(product,i+1)
}
for{
;
}
}
三、读写锁
读写锁与普通的互斥锁不同,它允许多个线程同时持有读锁,并且读取操作不会产生数据竞争,因此多个读取操作可以并发执行,提高了程序的并发性。而在写入操作时,为了保证数据的一致性,只能有一个线程持有写锁,其他的线程需要等待写锁被释放之后再进行操作。
在 C++ 中,可以使用标准库中的 std::shared_mutex 类型来实现读写锁。std::shared_mutex 类型提供了 lock_shared、unlock_shared、lock 和 unlock 等方法,可以方便地实现读写锁。但是在C++17之前没有 std::shared_mutex,本文采用 std::mutex 与 std::condition_variable 的方式实现读写锁。
在 Go 中,读写锁可以使用标准库中的 sync.RWMutex 类型来实现。sync.RWMutex 类型提供了 RLock、RUnlock、Lock 和 Unlock 等方法,可以方便地实现读写锁。
c++版本
这一部分在前面的文章中已经详细介绍过了。
c++实利用互斥锁与条件变量现读写锁
golang版本
定义了一个 Counter 结构体,并提供了 Increment、IncrementWithRW、ReadWithoutRW 和 ReadWithRW 四个方法,分别实现了有读写锁与普通锁的计数器递增和读取操作。
在 runWithoutRW 函数中,我们使用互斥锁实现了计数器的增加操作,在 runWithRW 函数中,则采用读写锁实现对计数器的并发访问操作。
package main
import (
"fmt"
"sync"
"time"
)
const (
NumRoutines = 100 // 协程数量
NumIncrements = 10000 // 递增次数
NumReads = 50000 // 读取次数
)
type Counter struct {
mu sync.Mutex
rw sync.RWMutex
count int
}
func (c *Counter) Increment() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}
func (c *Counter) IncrementWithRW() {
c.rw.Lock()
c.count++
c.rw.Unlock()
}
func (c *Counter) ReadWithRW() int {
c.rw.RLock()
defer c.rw.RUnlock()
return c.count
}
func (c *Counter) ReadWithoutRW() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
func runWithoutRW() {
var wg sync.WaitGroup
c := Counter{}
start := time.Now()
for i := 0; i < NumRoutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < NumIncrements; j++ {
c.Increment()
}
}()
}
for i := 0; i < NumRoutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < NumReads; j++ {
_ = c.ReadWithoutRW()
}
}()
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Without RW: Count=%d, Elapsed=%v
", c.Read(), elapsed)
}
func runWithRW() {
var wg sync.WaitGroup
c := Counter{}
start := time.Now()
for i := 0; i < NumRoutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < NumIncrements; j++ {
c.IncrementWithRW()
}
}()
}
for i := 0; i < NumRoutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < NumReads; j++ {
_ = c.ReadWithRW()
}
}()
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("With RW: Count=%d, Elapsed=%v
", c.Read(), elapsed)
}
func main() {
runWithoutRW()
runWithRW()
}
可以看到,在使用读写锁的情况下,程序的运行时间大大缩短了,因为多个读取操作可以同时进行,并且不会受到其他线程的写入操作的干扰。而在没有使用读写锁的情况下,因为每次进行计数器递增时需要加锁,因此需要等待其他线程的锁释放,导致程序的执行时间相对较长。
四、信号量
信号量(Semaphore)是一种同步原语,用于实现多线程和多进程之间的同步和互斥。信号量的本质是一个整数计数器,通常用于限制对共享资源的访问数量。信号量的实现涉及到两个关键操作:wait(或称为P操作)和post(或称为V操作)。
基本实现原理
初始化:信号量在创建时需要进行初始化,通常将计数器设置为允许同时访问共享资源的最大数量。
Wait(P)操作:当一个线程或进程想要访问共享资源时,会执行wait操作。在wait操作中,信号量的计数器减1。如果计数器的值为负数,表示没有可用的资源,执行wait操作的线程/进程将被阻塞,直到有资源可用。
Post(V)操作:当一个线程或进程完成对共享资源的访问后,会执行post操作。在post操作中,信号量的计数器加1。如果计数器的值小于等于0,表示有等待的线程/进程,此时会唤醒一个被阻塞的线程/进程
c++版本
在 C++ 中,可以使用 标准库来实现信号量。 标准库提供 std::semaphore 类,其构造函数可以指定信号量的初值。std::semaphore 类包含了主要的信号量操作函数 wait 和 post,它们分别用于获取和释放信号量。
以下是信号量的操作函数:
std::semaphore::semaphore(int n = 0):构造函数,创建一个初始为 n 的信号量,如果未指定 n 的值,则默认为 0。
void std::semaphore::wait():获取信号量,如果信号量的计数器为 0,则当前线程将阻塞等待信号量的释放。
void std::semaphore::post():释放信号量,将信号量的计数器加 1。
使用 sem_t 类型变量 sem 来实现信号量,并将其初始值设为 0。我们创建 5 个线程,在每个线程中执行一些操作,并将结果记录在 results 中,然后执行 sem_post 操作来释放信号量,表示当前线程已完成。
#include <semaphore.h>
#include <iostream>
#include <thread>
#include <vector>
std::vector<int> results;
sem_t sem;
void thread_function(int i) {
std::cout << "Thread " << i << " started." << std::endl;
/* simulate some work */
std::this_thread::sleep_for(std::chrono::milliseconds(500));
/* add result */
results[i] = i;
/* signal completion */
sem_post(&sem);
std::cout << "Thread " << i << " finished." << std::endl;
}
int main() {
/* initialize semaphore */
sem_init(&sem, 0, 0);
/* initialize results vector */
results.resize(5);
/* create threads */
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
threads.emplace_back(thread_function, i);
}
/* wait for all threads to finish */
for (int i = 0; i < 5; ++i) {
sem_wait(&sem);
}
/* print results */
std::cout << "Results: ";
for (auto r : results) {
std::cout << r << " ";
}
std::cout << std::endl;
/* cleanup semaphore */
sem_destroy(&sem);
/* join threads */
for (auto& t : threads) {
t.join();
}
return 0;
}
golang版本
Golang 中的信号量通过 chan 来实现。具体来说,可以使用以下代码实现信号量:
type Semaphore chan struct{}
func NewSemaphore(n int) Semaphore {
return make(Semaphore, n)
}
func (s Semaphore) Wait() {
s <- struct{}{}
}
func (s Semaphore) Signal() {
<-s
}