您现在的位置是:首页 >技术教程 >锁与信号量的深度总结与实现【c++与golang】网站首页技术教程

锁与信号量的深度总结与实现【c++与golang】

UPUP小亮 2024-10-16 12:01:04
简介锁与信号量的深度总结与实现【c++与golang】


前言

线程同步和互斥是多线程编程中的两个重要概念,它们都是为了解决多线程环境下的资源共享和访问问题。
**线程同步:**线程同步是指多个线程按照一定的顺序执行,以确保在访问共享资源时,不会出现数据不一致的问题。同步机制主要包括条件变量和信号量。
**线程互斥:**线程互斥是指在同一时刻,只允许一个线程访问共享资源,以防止数据不一致和竞争条件。互斥锁是实现线程互斥的主要工具。
在操作系统中,锁、条件变量和信号量都是用来实现线程同步与互斥的机制。
锁(Lock)是一种互量,用于保护共享资源,防止多个线程同时访问。当一个线程获得锁时,其他线程必须等待该线程释放锁后才能访问共享资源。常见的锁包括互斥锁和读写锁。
条件变量(Condition Variable)是一种线程间通信的机制,用于等待某个条件的发生。当一个线程等待某个条件时,它会阻塞并释放锁,直到另一个线程发出信号通知该条件已经满足,该线程才会被唤醒并重新获得锁。
信号量(Semaphore)是一种计数器用于控制多个线程对共享资源的访问。当一个线程访问共享资源时它会尝试获取信号量,如果号量的值大于 0,则该线程可以访问共享资源并将信号量的值减 1;如果信号量的值等于 0,则该线程必须等待其他线程释放信号量后才能访问共享资源。

本文分别用c++与golang来使用各种锁,条件变量、信号量来实现线程间的同步和互斥

一、互斥锁和自旋锁

与单线程编程不同的是,在多线程编程中,多个线程之间可能会同时访问同一个变量、函数或对象等共享资源,因此需要采取措施来避免数据竞争等并发问题。

在 C++ 中,mutex 是一个同步原语,用于协调不同线程的访问和修改。C++11 引入了标准库中的 std::mutex 类,它提供了 lock() 和 unlock() 两个函数来控制互斥锁的状态。
std::mutex 的本质是一个操作系统提供的锁,它分为用户模式锁和内核模式锁。当线程试图去获取锁时,如果锁当前没有被其它线程持有,则该线程将获得这个锁,并且可以继续执行。否则,该线程将进入阻塞状态,直到锁被释放为止。
**在 Go 中,sync.Mutex 是互斥锁的实现。**它有两个方法:Lock 和 Unlock,用于控制互斥锁的状态。sync.Mutex 的底层实现是基于操作系统提供的 futex(fast userspace mutex)机制,它使得线程在获取锁时可以避免进入内核态,并且能够更加高效地进行线程上下文切换。这使得 Go 中的互斥锁在资源竞争较小、并发度较高的情况下有着良好的性能表现。

c++版本

我们首先定义了一个共享变量 num,并在 AddWithOutLock 和 AddWithLock 两个函数中分别对其进行累加操作。其中,AddWithOutLock 是不加锁的版本,而 AddWithLock 是使用互斥锁保护的版本。

在 main 函数中,我们使用 emplace_back 函数向线程数组 threads 中添加子线程对象,并分别传入 AddWithOutLock 和 AddWithLock 函数来创建十个不加锁和加锁的线程。接着,我们使用 join 函数阻塞主线程,等待所有线程执行完毕,并输出累加结果


mutex mtx;
int num = 0;
void AddWithLock() {
   for (int i = 0; i < 2000; i++) {
      mtx.lock();
      num++;
      mtx.unlock();
   }
}
void AddWithOutLock() {
   for (int i = 0; i < 2000; i++) {
      num++;
   }
}

int main() {
   vector<thread> threads;  // 创建线程数组
   for (int i = 0; i < 10; i++) {
      threads.emplace_back(
          AddWithOutLock);  // 用 emplace_back 函数向数组添加元素
   }
   for (auto& t : threads)  // 遍历线程数组
   {
      t.join();  // 阻塞当前线程,等待子线程结束
   }
   cout << "AddWithOutLock: " << num << endl;

   // 加锁版测试
   num = 0;          // 重置 num 值
   threads.clear();  // 清空线程数组
   for (int i = 0; i < 10; i++) {
      threads.emplace_back(AddWithLock);
   }
   for (auto& t : threads) {
      t.join();
   }
   cout << "AddWithLock: " << num << endl;

   return 0;
}

从运行结果可以看出,不加锁的版本出现了数据错误,而加锁的版本则能够正确地累加结果,说明使用互斥锁确实可以有效避免并发问题。
在这里插入图片描述

golang版本

在 main 函数中,我们创建了十个 AddWithOutLock 和 AddWithLock 的 goroutine,并且使用 wg.Add 和 wg.Wait 方法控制并发,保证所有 goroutine 都执行完毕后再输出结果。在 Goroutine 中,和C++代码一样,我们使用 sync.Mutex 来实现互斥锁的功能,保证 num 变量的线程安全。



var mtx sync.Mutex
var wg sync.WaitGroup
var num int
func AddWithLock() {
	defer wg.Done()
	for i:=0;i<2000;i++{
	mtx.Lock()
		num++
	mtx.Unlock()
   }
}
func AddWithOutLock() {
	defer wg.Done()
	for i:=0;i<2000;i++{
		num++
   }
}
func main() {
    wg.Add(10)
	for i:=0;i<10;i++{
		go AddWithOutLock()
	}
    wg.Wait()
	fmt.Println("AddWithOutLock:",num)
	
	
	num=0
		wg.Add(10)
	for i:=0;i<10;i++{
		go AddWithLock()
	}
    wg.Wait()
	fmt.Println("AddWithLock:",num)
}

从运行结果可以看出,不加锁的版本出现了数据错误,而加锁的版本则能够正确地累加结果,说明使用互斥锁确实可以有效避免并发问题。
在这里插入图片描述

总结:

至此,本文已经介绍了如何使用互斥锁确保多线程对共享变量的安全访问,同时给出了 C++ 和 Go 两种语言的示例代码。虽然语言不同,但是实现的基本原理是相同的。在编写多线程程序时,无论是 C++、Go 还是其他语言,都需要注意多个线程之间的共享资源访问协调,使用适当的同步机制保证线程安全。

二、条件变量

条件变量(Condition Variable)是一种用于实现线程间同步的原语。条件变量允许线程等待某个条件的满足,当条件满足时,其他线程会通知等待的线程。条件变量通常与互斥锁(Mutex)一起使用,以保护共享资源的访问和同步。当一个线程需要等待某个条件成立时,它可以调用条件变量的等待函数将自己阻塞,同时释放持有的互斥锁,当条件成立时,另一个线程可以调用条件变量的通知函数唤醒等待线程。

在 C++ 中,我们可以通过 std::condition_variable 类的 wait() 方法等待条件满足。当条件不满足时,wait() 方法会阻塞当前线程,并释放该条件变量绑定的锁;当条件满足时,wait() 方法会重新获得该锁并返回。
同时,使用 notify_one() 或 notify_all() 方法来通知等待在该条件变量上的线程继续执行。
在golang中,使用sync.Cond可以实现条件变量。func (c Cond) Wait()阻塞等待条件变量满足,释放已掌握的互斥锁相当于cond.L.Unlock()。 注意:两步为一个原子操作。当被唤醒,Wait()函数返回时,解除阻塞并重新获取互斥锁。相当于cond.L.Lock()func (c *Cond) Signal():单发通知,给一个正等待(阻塞)在该条件变量上的goroutine(线程)发送通知【一般都使用这个】

c++版本

互斥量 mtx 用于保护共享数据 product,而条件变量 cond 用于实现线程同步和通信。生产者线程在生产随机数并将其放入缓冲区之前**,首先需要获取互斥量 mtx 的锁,以防止其他线程访问共享数据 product。如果缓冲区已经满了,生产者线程需要调用 cond.wait(lk) 进行等待,此时会自动释放互斥量 mtx 的锁,并阻塞当前线程,等待其他线程发出通知**。当缓冲区有空位时,生产者线程会产生一个随机数并将其放入缓冲区,然后解锁互斥量 mtx 并通知所有等待的线程,即 cond.notify_all()

消费者线程的实现与生产者线程类似,也需要先获取互斥量 mtx 的锁,并在缓冲区为空时等待其他线程发出通知。当缓冲区中有数据可供消费时,消费者线程会从缓冲区中取出一个随机数进行消费,然后解锁互斥量 mtx 并通知所有等待的线程。


std::queue<int> product;
std::mutex mtx;
std::condition_variable cond;

void producer(int id) {
   while (true) {
      std::unique_lock<std::mutex> lk(mtx);  // 加锁
      while (product.size() == 5) {          // 缓冲区满了,等待消费
         cond.wait(lk);
      }
      int num = rand() % 1000;
      product.push(num);
      std::cout << "生产者:" << id << " 生产: " << num << std::endl;
      lk.unlock();        // 解锁
      cond.notify_all();  // 通知所有等待的线程
      std::this_thread::sleep_for(std::chrono::milliseconds(300));
   }
}

void consumer(int id) {
   while (true) {
      std::unique_lock<std::mutex> lk(mtx);  // 加锁
      while (product.empty()) {              // 缓冲区为空,等待生产
         cond.wait(lk);
      }
      int num = product.front();
      product.pop();
      std::cout << "消费者: " << id << " 消费: " << num << std::endl;
      lk.unlock();        // 解锁
      cond.notify_all();  // 通知所有等待的线程
      std::this_thread::sleep_for(std::chrono::milliseconds(300));
   }
}

int main() {
   srand(time(nullptr));

   for (int i = 0; i < 5; ++i) {
      std::thread t(producer, i + 1);
      t.detach();
   }
   for (int i = 0; i < 5; ++i) {
      std::thread t(consumer, i + 1);
      t.detach();
   }

   while (true) {
      std::this_thread::yield();
   }

   return 0;
}

golang版本

当生产者发现缓冲区已经满了时,它会调用 cond.Wait() 方法来释放锁,并等待消费者唤醒它;当消费者发现缓冲区为空时,它也会调用 cond.Wait() 方法来释放锁,并等待生产者唤醒它。当有新的数据被生产或消费时,生产者或消费者会调用 cond.Signal() 或 cond.Broadcast() 方法来唤醒等待的生产者或消费者继续执行。

var cond sync.Cond      //定义全局条件变量
func producer(out chan<-int,idx int){
	for {
		//先加锁
		cond.L.Lock()
		//判断缓冲区是否满
		for len(out) == cap(out){
			cond.Wait()    //1,zuse 2 shifangsuo 3 huanxingjiasuo
		}
		num:= rand.Intn(999)
		out<- num
		fmt.Println("生产者:",idx,"生产:", num)
		//访问公共区结束,并且打印结束,解锁
		cond.L.Unlock()
		//唤醒对端
		cond.Signal()
		time.Sleep(time.Millisecond*300)
	}
}

func consumer(in <-chan int,idx int){
	for {
		cond.L.Lock()
		//判断缓冲区是否为空
		for len(in)== 0{
			cond.Wait()
		}
		num := <-in
		 fmt.Println("消费者:",idx,"消费:",num)
		//访问公共区结束后,解锁
		cond.L.Unlock()  //锁的力度越小越好
		cond.Signal()
		 time.Sleep(time.Millisecond*300)
	}
}

func main(){
	rand.Seed(time.Now().UnixNano())
	product := make(chan int,5)
	//指定条件变量 使用的锁
	cond.L = new(sync.Mutex)//互斥锁初值为0,是未加锁状态

	for i:=0;i<5;i++{
		go producer(product,i+1)

	}
	for i:=0;i<5;i++{
		go consumer(product,i+1)
	}
	for{
		;
	}
}

三、读写锁

读写锁与普通的互斥锁不同,它允许多个线程同时持有读锁,并且读取操作不会产生数据竞争,因此多个读取操作可以并发执行,提高了程序的并发性。而在写入操作时,为了保证数据的一致性,只能有一个线程持有写锁,其他的线程需要等待写锁被释放之后再进行操作。

在 C++ 中,可以使用标准库中的 std::shared_mutex 类型来实现读写锁。std::shared_mutex 类型提供了 lock_shared、unlock_shared、lock 和 unlock 等方法,可以方便地实现读写锁。但是在C++17之前没有 std::shared_mutex,本文采用 std::mutex 与 std::condition_variable 的方式实现读写锁

在 Go 中,读写锁可以使用标准库中的 sync.RWMutex 类型来实现。sync.RWMutex 类型提供了 RLock、RUnlock、Lock 和 Unlock 等方法,可以方便地实现读写锁。

c++版本

这一部分在前面的文章中已经详细介绍过了。
c++实利用互斥锁与条件变量现读写锁

golang版本

定义了一个 Counter 结构体,并提供了 Increment、IncrementWithRW、ReadWithoutRW 和 ReadWithRW 四个方法,分别实现了有读写锁与普通锁的计数器递增和读取操作。
在 runWithoutRW 函数中,我们使用互斥锁实现了计数器的增加操作,在 runWithRW 函数中,则采用读写锁实现对计数器的并发访问操作。

package main

import (
    "fmt"
    "sync"
    "time"
)

const (
    NumRoutines     = 100      // 协程数量
    NumIncrements   = 10000    // 递增次数
    NumReads        = 50000    // 读取次数
)

type Counter struct {
    mu sync.Mutex
    rw sync.RWMutex
    count int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    c.count++
    c.mu.Unlock()
}

func (c *Counter) IncrementWithRW() {
    c.rw.Lock()
    c.count++
    c.rw.Unlock()
}

func (c *Counter) ReadWithRW() int {
    c.rw.RLock()
    defer c.rw.RUnlock()
    return c.count
}
func (c *Counter) ReadWithoutRW() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

func runWithoutRW() {
    var wg sync.WaitGroup
    c := Counter{}

    start := time.Now()

    for i := 0; i < NumRoutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < NumIncrements; j++ {
                c.Increment()
            }
        }()
    }

    for i := 0; i < NumRoutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < NumReads; j++ {
                _ = c.ReadWithoutRW()
            }
        }()
    }

    wg.Wait()

    elapsed := time.Since(start)
    fmt.Printf("Without RW: Count=%d, Elapsed=%v
", c.Read(), elapsed)
}

func runWithRW() {
    var wg sync.WaitGroup
    c := Counter{}

    start := time.Now()

    for i := 0; i < NumRoutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < NumIncrements; j++ {
                c.IncrementWithRW()
            }
        }()
    }

    for i := 0; i < NumRoutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < NumReads; j++ {
                _ = c.ReadWithRW()
            }
        }()
    }
    wg.Wait()
    elapsed := time.Since(start)
    fmt.Printf("With RW: Count=%d, Elapsed=%v
", c.Read(), elapsed)
}

func main() {
    runWithoutRW()
    runWithRW()
}

在这里插入图片描述

可以看到,在使用读写锁的情况下,程序的运行时间大大缩短了,因为多个读取操作可以同时进行,并且不会受到其他线程的写入操作的干扰。而在没有使用读写锁的情况下,因为每次进行计数器递增时需要加锁,因此需要等待其他线程的锁释放,导致程序的执行时间相对较长。

四、信号量

信号量(Semaphore)是一种同步原语,用于实现多线程和多进程之间的同步和互斥。信号量的本质是一个整数计数器,通常用于限制对共享资源的访问数量。信号量的实现涉及到两个关键操作:wait(或称为P操作)和post(或称为V操作)。
基本实现原理
初始化:信号量在创建时需要进行初始化,通常将计数器设置为允许同时访问共享资源的最大数量。
Wait(P)操作:当一个线程或进程想要访问共享资源时,会执行wait操作。在wait操作中,信号量的计数器减1。如果计数器的值为负数,表示没有可用的资源,执行wait操作的线程/进程将被阻塞,直到有资源可用。
Post(V)操作:当一个线程或进程完成对共享资源的访问后,会执行post操作。在post操作中,信号量的计数器加1。如果计数器的值小于等于0,表示有等待的线程/进程,此时会唤醒一个被阻塞的线程/进程

c++版本

在 C++ 中,可以使用 标准库来实现信号量。 标准库提供 std::semaphore 类,其构造函数可以指定信号量的初值。std::semaphore 类包含了主要的信号量操作函数 wait 和 post,它们分别用于获取和释放信号量。
以下是信号量的操作函数:
std::semaphore::semaphore(int n = 0):构造函数,创建一个初始为 n 的信号量,如果未指定 n 的值,则默认为 0。
void std::semaphore::wait():获取信号量,如果信号量的计数器为 0,则当前线程将阻塞等待信号量的释放。
void std::semaphore::post():释放信号量,将信号量的计数器加 1。
使用 sem_t 类型变量 sem 来实现信号量,并将其初始值设为 0。我们创建 5 个线程,在每个线程中执行一些操作,并将结果记录在 results 中,然后执行 sem_post 操作来释放信号量,表示当前线程已完成。

#include <semaphore.h>
#include <iostream>
#include <thread>
#include <vector>

std::vector<int> results;
sem_t sem;

void thread_function(int i) {
    std::cout << "Thread " << i << " started." << std::endl;

    /* simulate some work */
    std::this_thread::sleep_for(std::chrono::milliseconds(500));

    /* add result */
    results[i] = i;

    /* signal completion */
    sem_post(&sem);

    std::cout << "Thread " << i << " finished." << std::endl;
}

int main() {
    /* initialize semaphore */
    sem_init(&sem, 0, 0);

    /* initialize results vector */
    results.resize(5);

    /* create threads */
    std::vector<std::thread> threads;
    for (int i = 0; i < 5; ++i) {
        threads.emplace_back(thread_function, i);
    }

    /* wait for all threads to finish */
    for (int i = 0; i < 5; ++i) {
        sem_wait(&sem);
    }

    /* print results */
    std::cout << "Results: ";
    for (auto r : results) {
        std::cout << r << " ";
    }
    std::cout << std::endl;

    /* cleanup semaphore */
    sem_destroy(&sem);

    /* join threads */
    for (auto& t : threads) {
        t.join();
    }

    return 0;
}

golang版本

Golang 中的信号量通过 chan 来实现。具体来说,可以使用以下代码实现信号量:

type Semaphore chan struct{}

func NewSemaphore(n int) Semaphore {
    return make(Semaphore, n)
}

func (s Semaphore) Wait() {
    s <- struct{}{}
}

func (s Semaphore) Signal() {
    <-s
}
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。