您现在的位置是:首页 >学无止境 >C++并发线程 - 如何线程间共享数据【如何使用锁操作】网站首页学无止境

C++并发线程 - 如何线程间共享数据【如何使用锁操作】

Allen.Su 2024-08-08 00:01:02
简介C++并发线程 - 如何线程间共享数据【如何使用锁操作】

系列文章目录

C++高性能优化编程系列
深入理解软件架构设计系列
高级C++并发线程编程
深入理解设计模式系列

期待你的关注哦!!!

快乐在于态度,成功在于细节,命运在于习惯。
Happiness lies in the attitude, success lies in details, fate is a habit.


具体哪个线程按何种方式访问什么数据?还有,一旦改动了数据,如果牵涉到其他线程,它们要在何时以什么通信方式获得通知?同一进程内的多个线程之间,虽然可以简单易行地共享数据,但这不是绝对的优势,优势甚至是很大的劣势。不正确使用共享数据,是产生与开发有关的错误的一个很大的诱因。

如果共享数据都是只读数据,就不会有问题;但是,同时一旦有删除删除数据就会出现问题。

在并发编程中,操作由两个或者多个线程负责,它们争先让线程执行各自的操作,而结果取决于它们执行的相对次序,所有这种情况都是条件竞争

1、但是如何防止恶性的条件竞争呢?

  • 有锁数据结构:采取保护措施包装数据结构,确保不变量被破坏时,中间状态只对执行改动线程可见。在其他访问同一数据结构的视角中,这种改动要么尚未开始,要么已经完成。
  • 无锁数据结构:修改数据结构的设计及其不变量,由一连串不可拆分的改动完成数据变更,每个改动都维持数据变量不被破坏。通常这种编程难以正确编写。

保护共享数据的最基本方式就是互斥

2、如何用互斥保护共享数据?

访问一个数据结构前,先锁住与数据相关的互斥;访问结束后,再解锁互斥。C++线程库保证了,一旦有线程锁住了某个互斥,若其他线程试图再给它加锁,则须等待,直至最初成功加锁的线程把该互斥解锁。这确保了全部线程所见到的共享数据是正确的,不变量没有没有被破坏。

2.1 如何使用互斥?

通过 std::mutex的实例创建互斥,调用成员函数lock()对其加锁,调用unlock()解锁。但是一般不推荐直接调用成员函数的做法。原因是若按此处理,那就必须记住,在函数以外的每条代码路径上都要调用unlock(),包括由于异常导致退出的路径。

C++标准库提供了类模版std::lock_guard<>,针对互斥融合实现了RAII机制:在构造是给互斥加锁,在析构时给互斥解锁,从而保证互斥总被正确的解锁。

如下,用互斥保护链表:

#include <list>
#include <mutex>
#include <algorithm>
std::list<int> some_list;
void add_to_list(int new_value)
{
	std::lock_guard<std::mutex> guard(some_mutex);
	some_list.push_back(new_value);
}
bool list_contains(int value_to_find)
{
	std::lock_gurd<std::mutex> guard(some_mutex);
	return std::find(some_list.begin(), some_list.end(), value_to_find) != some_list.end();
}

2.2 本意互斥保护数据却留有余地,如何防止隐患呢?

如果成员函数返回的指针或者引用,指向受保护的共享数据,那么即便成员函数全都良好、有序的方式锁定互斥,仍然无济于事,因为受保护已被打破,出现大漏洞。 只要存在任何能访问该指针和引用的代码,它就可以访问受保护的共享数据,则须谨慎设计程序接口,从而确保互斥先行锁定,再对受保护的共享数据进行访问,并保证不留后门。

我们来看看,意外的向外传递引用,指向受保护的共享数据:

class some_data
{
	int a;
	std::string b;
public:
	void do_something();
};
class data_wrapper
{
private:
	some_data data;
	std::mutex m;
public:
	template<typename Fubction>
	void process_data(Function func)
	{
		std::lock_guard<std::mutex> l(m);
		//向使用者提供的函数传递受保护的数据
		func(data);
	}
};
some_data* unprotected;
void malicious_function(some_data& protected_data)
{
	unprotected = &protected_data;
}
data_wrapper x;
void foo()
{
	//传入恶意函数
	x.process_data(malicious_function);
	//以无保护方式访问本应受保护的共享数据
	unprotected->do_something();
}

我们除了要检查成员函数,防止向调用者传出指针或引用,还必须检查另一种情况:若成员函数在自身内部调用了别的函数,而这些函数却不受我们掌控,那么,也不得向他们传递这些指针或引用。如果有就很危险。

2.3 如何解决容器本身接口固有的条件竞争?

多线程访问的情况下STL容器内的empty()size()的结果不可信。尽管,在某个线程调用empty()size()时,返回值可能是正确的。然而,一旦函数返回,其他线程就不再受限,从而能自由地访问栈容器,可能马上有新元素入栈,或者,现有的元素会立刻出栈,令前面的线程得到结果失效而无法使用。

在空栈上调用top()会导致未定义行为。

但是如何解决上述问题?

  • 传入引用;
  • 提供不抛出异常的拷贝构造函数或不抛出异常的移动构造函数;
  • 返回指针指向弹出的元素。

如下代码,线程安全的栈容器类:

#include <exception>
#include <memory>
#include <mutex>
#include <stack>
struct empty_stack: std::exception
{
	const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
private:
	std::statck<T> data;
	mutable std::mutex m;
public:
	threadsafe_stack(){}
	threadsafe_stack(const threadsafe_stack& other)
	{
		std::lock_guard<std::metux> lock(other.m);
		//在构造函数的函数体(constructor body)内进行复制操作
		data = other.data;
	}
	//将赋值运算符删除
	threadsafe_stack& operator=(const threadsafe_stack&) = delete;
	void push(T new_value)
	{
		std::lock_guard<std::mutex> lock(m);
		data.push(std::move(new_value));
	}
	//返回std::share_ptr<T>
	std::share_ptr<T> pop()
	{
		std::lock_guard<std::mutex> lock(m);
		//试图在弹出前检查是否为空栈
		if(data.empty()) throw empty_stack();
		//改动栈容器前设置返回值
		std::share_ptr<T> const res(std::make_shared<T>(data.top()));
		data.pop();
		return res;
	}
	//接收引用参数,指向某外部变量的地址,存储弹出的值
	void pop(T& value)
	{
		std::lock_guard<std::mutex> lock(m);
		if(data.empty()) throw empty_stack();
		value = data.top();
		data.pop();
	}
	bool empty() const
	{
		std::lock_guard<std::mutex> lock(m);
		return data.empty();
	}
};

2.4 如何解决死锁问题?

线程在互斥上争夺抢锁:有两个线程,都需同时锁住两个互斥,都等待着再给另一个互斥加锁。于是,双方毫无进展,因为它们同时在苦苦等待对方解锁互斥。此时造成死锁。
死锁:两个线程互相等待,停滞不前。

防范死锁的建议通常是始终按相同的顺序对两个互斥加锁。若我们总是先锁互斥A,在锁互斥B,则永远不会发生死锁。

(1)运用std::lock函数和std::lock_guard类模版,进行内部数据的互换操作:

class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);
class X
{
private:
	some_big_object some_detail;
	std::mutex m;
public:
	X(some_big_object const& sd):some_detail(sd){}
	friend void swap(X& lhs, X& rhs)
	{
		if(&lhs == &rhs)
			return;
		std::lock(lhs.m, rhs.m);
		//std::adopt_lock指明互斥上已被锁住,即互斥上有锁存在
		std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
		std::lock_guard<std::mutex> lock_a(rhs.m, std::adopt_lock);
		swap(lhs.some_detail, rhs.some_detail);
    }
}

std::adopt_lock指明互斥上已被锁住,即互斥上有锁存在

(2)使用std::unique_lock锁,如下代码:

//实例std::defer_lock将互斥保留为无锁状态
std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
std::unique_lock<std::mutex> lock_b(lhs.m, std::defer_lock);
//到这里才对互斥加锁
std::lock(lock_a, lock_b);

std::unique_lock占用更多的空间,也比std::lock_guard更慢,但是std::unique_lock对象可以不占用关联的互斥,具备这份灵活性需要付出代价:需要存储并且更新互斥信息。
std::defer_lock将互斥保留为无锁状态

(3)C++17 提供一个新的RAII类模版std::scoped_lock<>。std::scoped_lock<>和std::scoped_guard<> 完全等价,只不过前者是可变参数类模版,接收各种互斥类型作为模版参数类表,还以多个互斥对象作为构造函数的参数列表。

swap(X& lhs, X& rhs)
{
	if(&lhs == &rhs)
		return;
	std::scoped_lock guard(lhs.m, rhs.m);

	swap(lhs.some_detail, rhs.some_detail);	
}

std::scoped_lock guard(lhs.m, rhs.m)等价于 std::scoped_lock<std::mutex, std::mutex> guard(lhs.m, rhs.m);

2.5 如何使用std::unique_lock转移互斥归属权?

因为std::unique_lock实例不占有与之关联的互斥,所以随着其实例的转移,互斥的归属权可以在多个std::unique_lock实例之间转移。std::unique_lock是可移动不可复制

转移有一种用途:准许函数锁定互斥,然后把互斥的归属权转移给函数调用者,好让它在同一锁的保护下执行其他操作

看如下代码,get_lock() 函数先锁定互斥,接着对数据做前期准备,再将归属权返回给调用者:

std::unique_lock<std::mutex> get_lock()
{
	extern std::mutex some_mutex;
	std::unique_lock<std::mutex> lk(some_mutex);
	prepare_data();
	return lk;
}
void process_data()
{
	std::unique_lock<std::mutex> lk(get_lock());
	do_something();
}

由于锁lk是get_lock函数中声明的std::unique_lock局部变量,因此代码无需调用std::move()就能把它直接返回,编译器会妥善调用移动构造函数。

2.6 如何使用std::unique_lock按合适的粒度加锁?

粒度精细的锁保护少量数据,而粒度粗大的锁保护大量数据。锁操作有两个要点:

  • 选择足够粗大的锁粒度,确保目标数据受到保护;
  • 限制范围,务求只在必要的操作过程中持锁。

std::unique_lock类具有成员函数lock()unlock()try_lock()
可用std::unique_lock处理:假如代码不再需要访问共享数据,那么我们就调用unlock()解锁;若以后需要重新访问,则调用lock()加锁。

void get_and_process_data()
{
	std::unique_lock<std::mutex> my_lock(the_mutex);
	some_class data_to_process = get_next_data_chunk();
	my_lock.unlock();
	//假定调用process()期间,互斥无须加锁
	result_type result = process(data_to_process);
	my_lock.lock();
	//重新锁住互斥,以写出结果
	write_result(data_to_process, result);
}

若只用单独一个互斥保护整个数据结构,不但可以加剧锁的争夺,还将难以缩短缩短持锁时间。假设某种操作需对同一个互斥全程加锁,当中步骤越多,则持锁时间越久。这是一种双重损失,恰恰加倍促使我们尽可能该用粒度精细的锁。

比如删除某数据,得先进行查询再删除,可以先获取拷贝对象数据,遍历完再进行删除。减少锁持续时间。

2.7 如果对很少更新的数据结构该如何优化加锁?

采用std::mutex保护数据结构过于严苛,原因是即便没发生改动,他照样会禁止并发访问。C++17标准库提供std::share_mutex,我们需要采用新类型的互斥。由于新类型的互斥具有两种不同的使用方式,因此通常被称为读写互斥:允许单独一个“写线程”进行完全排它访问,也允许多个“读线程”共享数据或并发访问。

共享锁即读锁,对应std::shared_lock<std::shared_mutex>
排他锁即写锁,对应std::lock_guard<std::shared_mutex>和std::unique_guard<std::shared_mutex>

运用 std::shared_mutex 保护数据结构,代码如下:

#include <map>
#include <string>
#include <mutex>
#include <shared_mutex>
class dns_enty;
class dns_cache
{
	std::map<std::string, dns_entry> entries;
	mutable std::shared_mutex entry_mutex;
public:
	dns_entry find_entry(std::string const& domain) const
	{
		std::shared_lock<std::shared_mutex> lk(entry_mytex);
		std::map<std::string, dns_entry>::const_iterator const it = entries.find(domain);
		return(it == entries.end()) ? dns_entry() : it->second;
	}
	void update_or_add_entry(std::string const& domain, dns_entry const& dns_details)
	{
		std::lock_guard<std::shared_mutex> lk(entry_mutex);
		entries[domain] = dns_details;
    }
	
};

2.8 如何递归加锁?

使用std::mutex,再次对其重新加锁就会出错,将导致未定义行为。
使用std::recursive_mutex允许同一线程对某互斥的统一实例多次加锁,我们必须先释放全部的锁,才可以让另一个线程锁住该互斥。

例如:
若我们对它调用3次lock(),就必须调用3次unlock()。只要正确的使用std::lock_guard<std::recurive_mutex> std::unique_guard<std::recurive_mutex>,它们便会处理好递归锁。

3、多线程中如何在初始化过程中保护共享数据

用互斥实现线程安全的延迟初始化,代码如下:

std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo()
{
	//此处,全部线程都被迫循环运行
	std::unique_lock<std::mutex> lk(resource_mutex);
	if(resource_ptr)
	{
		//仅有初始化需要保护数据
		resource_ptr.reset(new some_resource);
	}
	lk.unlock();
	resource_ptr->do_something();
}

不过数据为多线程使用,那么它们便无法并发访问,线程只能毫无必要的运行,因为每个线程都必须在互斥上轮候,等待查验数据是否已经完成初始化。

为此,C++标准库中提供了std::once_flag类和std::call_once()函数。
令所有线程共同调用std::call_once函数,从而确保在该调用返回时,指针初始化由其中某线程安全且唯一完成(通过适合的同步机制)。必要同步数据则有std::once_flag实例存储,每个std::call_flag实例对应一次不同的初始化。相比显示使用互斥,std::call_once()函数的额外开销往往更低,特别是在初始已经完成的情况下,所以如果功能符合需求就应优先使用。

延迟初始化代码如下:

std::shared_ptr<some_resource> resource_ptr;
//实例存储
std::once_flag resource_flag;
void init_resource()
{
	resource_ptr.reset(new some_resource);
}
void foo()
{
	//初始化函数准确地被唯一一次调用
	std::call_once(resource_flag, init_resource);
	resource_ptr->do_something();
}

利用std::call_once()函数对类X的数据成员实施线程安全的延迟初始化:

class X
{
private:
	connection_info connection_details;
	connection_handle connection;
	std::once_flag connection_init_flag;
	void open_connection()
	{
		connection = connection_manager.open(connection_details);
    }
public:
	X(connection_info const& connection_details_);
		connection_details(connection_details_)
		{}
	void send_data(data_packet const& data)
	{
		std::call_once(connection_init_flag, &X::open_connection, this);
		connection.send_data(data);
	}
	data_packet receive_data()
	{
		std::call_once(connection_init_flag, &X::open_connection, this);
		return connection.receive_data();
	}
};

某些类的代码只需用到唯一一个全局实例,这种情形可用以下方法代替std::call_once():

class my_class;
//线程安全的初始化,C++11标准保证其正确性
my_class& get_my_class_instance()
{
	static my_class inctance;
	return instance;
}

多个线程可以安全地调用get_my_class_instance(),而无需担忧初始化的条件竞争。

4、小结

世上无难事,只怕有心人。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。