您现在的位置是:首页 >学无止境 >C++并发线程 - 如何线程间共享数据【如何使用锁操作】网站首页学无止境
C++并发线程 - 如何线程间共享数据【如何使用锁操作】
系列文章目录
C++高性能优化编程系列
深入理解软件架构设计系列
高级C++并发线程编程
深入理解设计模式系列
期待你的关注哦!!!
快乐在于态度,成功在于细节,命运在于习惯。
Happiness lies in the attitude, success lies in details, fate is a habit.
线程间共享数据
具体哪个线程按何种方式访问什么数据?还有,一旦改动了数据,如果牵涉到其他线程,它们要在何时以什么通信方式获得通知?同一进程内的多个线程之间,虽然可以简单易行地共享数据,但这不是绝对的优势,优势甚至是很大的劣势。不正确使用共享数据,是产生与开发有关的错误的一个很大的诱因。
如果共享数据都是只读数据,就不会有问题;但是,同时一旦有删除删除数据就会出现问题。
在并发编程中,操作由两个或者多个线程负责,它们争先让线程执行各自的操作,而结果取决于它们执行的相对次序,所有这种情况都是条件竞争
。
1、但是如何防止恶性的条件竞争呢?
有锁数据结构
:采取保护措施包装数据结构,确保不变量被破坏时,中间状态只对执行改动线程可见。在其他访问同一数据结构的视角中,这种改动要么尚未开始,要么已经完成。无锁数据结构
:修改数据结构的设计及其不变量,由一连串不可拆分的改动完成数据变更,每个改动都维持数据变量不被破坏。通常这种编程难以正确编写。
保护共享数据的最基本方式就是互斥。
2、如何用互斥保护共享数据?
访问一个数据结构前,先锁住与数据相关的互斥;访问结束后,再解锁互斥。C++线程库保证了,一旦有线程锁住了某个互斥,若其他线程试图再给它加锁,则须等待,直至最初成功加锁的线程把该互斥解锁。这确保了全部线程所见到的共享数据是正确的,不变量没有没有被破坏。
2.1 如何使用互斥?
通过 std::mutex
的实例创建互斥,调用成员函数lock()
对其加锁,调用unlock()
解锁。但是一般不推荐直接调用成员函数的做法。原因是若按此处理,那就必须记住,在函数以外的每条代码路径上都要调用unlock(),包括由于异常导致退出的路径。
C++标准库提供了类模版std::lock_guard<>
,针对互斥融合实现了RAII机制
:在构造是给互斥加锁,在析构时给互斥解锁,从而保证互斥总被正确的解锁。
如下,用互斥保护链表:
#include <list>
#include <mutex>
#include <algorithm>
std::list<int> some_list;
void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex);
some_list.push_back(new_value);
}
bool list_contains(int value_to_find)
{
std::lock_gurd<std::mutex> guard(some_mutex);
return std::find(some_list.begin(), some_list.end(), value_to_find) != some_list.end();
}
2.2 本意互斥保护数据却留有余地,如何防止隐患呢?
如果成员函数返回的指针或者引用,指向受保护的共享数据,那么即便成员函数全都良好、有序的方式锁定互斥,仍然无济于事,因为受保护已被打破,出现大漏洞。
只要存在任何能访问该指针和引用的代码,它就可以访问受保护的共享数据,则须谨慎设计程序接口,从而确保互斥先行锁定,再对受保护的共享数据进行访问,并保证不留后门。
我们来看看,意外的向外传递引用,指向受保护的共享数据:
class some_data
{
int a;
std::string b;
public:
void do_something();
};
class data_wrapper
{
private:
some_data data;
std::mutex m;
public:
template<typename Fubction>
void process_data(Function func)
{
std::lock_guard<std::mutex> l(m);
//向使用者提供的函数传递受保护的数据
func(data);
}
};
some_data* unprotected;
void malicious_function(some_data& protected_data)
{
unprotected = &protected_data;
}
data_wrapper x;
void foo()
{
//传入恶意函数
x.process_data(malicious_function);
//以无保护方式访问本应受保护的共享数据
unprotected->do_something();
}
我们除了要检查成员函数,防止向调用者传出指针或引用,还必须检查另一种情况:若成员函数在自身内部调用了别的函数,而这些函数却不受我们掌控,那么,也不得向他们传递这些指针或引用。如果有就很危险。
2.3 如何解决容器本身接口固有的条件竞争?
多线程访问的情况下STL容器
内的empty()
和size()
的结果不可信。尽管,在某个线程调用empty()
或size()
时,返回值可能是正确的。然而,一旦函数返回,其他线程就不再受限,从而能自由地访问栈容器,可能马上有新元素入栈,或者,现有的元素会立刻出栈,令前面的线程得到结果失效而无法使用。
在空栈上调用top()会导致未定义行为。
但是如何解决上述问题?
- 传入引用;
- 提供不抛出异常的拷贝构造函数或不抛出异常的移动构造函数;
- 返回指针指向弹出的元素。
如下代码,线程安全的栈容器类:
#include <exception>
#include <memory>
#include <mutex>
#include <stack>
struct empty_stack: std::exception
{
const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
private:
std::statck<T> data;
mutable std::mutex m;
public:
threadsafe_stack(){}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::metux> lock(other.m);
//在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
//将赋值运算符删除
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
//返回std::share_ptr<T>
std::share_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
//试图在弹出前检查是否为空栈
if(data.empty()) throw empty_stack();
//改动栈容器前设置返回值
std::share_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
//接收引用参数,指向某外部变量的地址,存储弹出的值
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
value = data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
2.4 如何解决死锁问题?
线程在互斥上争夺抢锁:有两个线程,都需同时锁住两个互斥,都等待着再给另一个互斥加锁。于是,双方毫无进展,因为它们同时在苦苦等待对方解锁互斥。此时造成死锁。
死锁:两个线程互相等待,停滞不前。
防范死锁的建议通常是始终按相同的顺序对两个互斥加锁。若我们总是先锁互斥A,在锁互斥B,则永远不会发生死锁。
(1)运用std::lock函数和std::lock_guard类模版,进行内部数据的互换操作:
class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs == &rhs)
return;
std::lock(lhs.m, rhs.m);
//std::adopt_lock指明互斥上已被锁住,即互斥上有锁存在
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
std::lock_guard<std::mutex> lock_a(rhs.m, std::adopt_lock);
swap(lhs.some_detail, rhs.some_detail);
}
}
std::adopt_lock指明互斥上已被锁住,即互斥上有锁存在
。
(2)使用std::unique_lock
锁,如下代码:
//实例std::defer_lock将互斥保留为无锁状态
std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
std::unique_lock<std::mutex> lock_b(lhs.m, std::defer_lock);
//到这里才对互斥加锁
std::lock(lock_a, lock_b);
std::unique_lock
占用更多的空间,也比std::lock_guard
更慢,但是std::unique_lock
对象可以不占用关联的互斥,具备这份灵活性需要付出代价:需要存储并且更新互斥信息。
std::defer_lock将互斥保留为无锁状态
。
(3)C++17 提供一个新的RAII类模版std::scoped_lock<>。std::scoped_lock<>和std::scoped_guard<> 完全等价,只不过前者是可变参数类模版,接收各种互斥类型作为模版参数类表,还以多个互斥对象作为构造函数的参数列表。
swap(X& lhs, X& rhs)
{
if(&lhs == &rhs)
return;
std::scoped_lock guard(lhs.m, rhs.m);
swap(lhs.some_detail, rhs.some_detail);
}
std::scoped_lock guard(lhs.m, rhs.m)等价于 std::scoped_lock<std::mutex, std::mutex> guard(lhs.m, rhs.m);
2.5 如何使用std::unique_lock转移互斥归属权?
因为std::unique_lock
实例不占有与之关联的互斥,所以随着其实例的转移,互斥的归属权可以在多个std::unique_loc
k实例之间转移。std::unique_lock是可移动不可复制
。
转移有一种用途:准许函数锁定互斥,然后把互斥的归属权转移给函数调用者,好让它在同一锁的保护下执行其他操作。
看如下代码,get_lock() 函数先锁定互斥,接着对数据做前期准备,再将归属权返回给调用者:
std::unique_lock<std::mutex> get_lock()
{
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk;
}
void process_data()
{
std::unique_lock<std::mutex> lk(get_lock());
do_something();
}
由于锁lk是get_lock函数中声明的std::unique_lock局部变量,因此代码无需调用std::move()就能把它直接返回,编译器会妥善调用移动构造函数。
2.6 如何使用std::unique_lock按合适的粒度加锁?
粒度精细的锁保护少量数据,而粒度粗大的锁保护大量数据。锁操作有两个要点:
- 选择足够粗大的锁粒度,确保目标数据受到保护;
- 限制范围,务求只在必要的操作过程中持锁。
std::unique_lock
类具有成员函数lock()
、unlock()
、try_lock()
可用std::unique_lock
处理:假如代码不再需要访问共享数据,那么我们就调用unlock()
解锁;若以后需要重新访问,则调用lock()
加锁。
void get_and_process_data()
{
std::unique_lock<std::mutex> my_lock(the_mutex);
some_class data_to_process = get_next_data_chunk();
my_lock.unlock();
//假定调用process()期间,互斥无须加锁
result_type result = process(data_to_process);
my_lock.lock();
//重新锁住互斥,以写出结果
write_result(data_to_process, result);
}
若只用单独一个互斥保护整个数据结构,不但可以加剧锁的争夺,还将难以缩短缩短持锁时间。假设某种操作需对同一个互斥全程加锁,当中步骤越多,则持锁时间越久。这是一种双重损失,恰恰加倍促使我们尽可能该用粒度精细的锁。
比如删除某数据,得先进行查询再删除,可以先获取拷贝对象数据,遍历完再进行删除。减少锁持续时间。
2.7 如果对很少更新的数据结构该如何优化加锁?
采用std::mutex
保护数据结构过于严苛,原因是即便没发生改动,他照样会禁止并发访问。C++17
标准库提供std::share_mutex
,我们需要采用新类型的互斥。由于新类型的互斥具有两种不同的使用方式,因此通常被称为读写互斥:允许单独一个“写线程”进行完全排它访问,也允许多个“读线程”共享数据或并发访问。
共享锁即读锁,对应std::shared_lock<std::shared_mutex>
排他锁即写锁,对应std::lock_guard<std::shared_mutex>和std::unique_guard<std::shared_mutex>
运用 std::shared_mutex
保护数据结构,代码如下:
#include <map>
#include <string>
#include <mutex>
#include <shared_mutex>
class dns_enty;
class dns_cache
{
std::map<std::string, dns_entry> entries;
mutable std::shared_mutex entry_mutex;
public:
dns_entry find_entry(std::string const& domain) const
{
std::shared_lock<std::shared_mutex> lk(entry_mytex);
std::map<std::string, dns_entry>::const_iterator const it = entries.find(domain);
return(it == entries.end()) ? dns_entry() : it->second;
}
void update_or_add_entry(std::string const& domain, dns_entry const& dns_details)
{
std::lock_guard<std::shared_mutex> lk(entry_mutex);
entries[domain] = dns_details;
}
};
2.8 如何递归加锁?
使用std::mutex
,再次对其重新加锁就会出错,将导致未定义行为。
使用std::recursive_mutex
允许同一线程对某互斥的统一实例多次加锁,我们必须先释放全部的锁,才可以让另一个线程锁住该互斥。
例如:
若我们对它调用3次lock(),就必须调用3次unlock()。只要正确的使用std::lock_guard<std::recurive_mutex>
和 std::unique_guard<std::recurive_mutex>
,它们便会处理好递归锁。
3、多线程中如何在初始化过程中保护共享数据
用互斥实现线程安全的延迟初始化,代码如下:
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo()
{
//此处,全部线程都被迫循环运行
std::unique_lock<std::mutex> lk(resource_mutex);
if(resource_ptr)
{
//仅有初始化需要保护数据
resource_ptr.reset(new some_resource);
}
lk.unlock();
resource_ptr->do_something();
}
不过数据为多线程使用,那么它们便无法并发访问,线程只能毫无必要的运行,因为每个线程都必须在互斥上轮候,等待查验数据是否已经完成初始化。
为此,C++标准库中提供了std::once_flag
类和std::call_once()
函数。
令所有线程共同调用std::call_once
函数,从而确保在该调用返回时,指针初始化由其中某线程安全且唯一完成(通过适合的同步机制)。必要同步数据则有std::once_flag
实例存储,每个std::call_flag
实例对应一次不同的初始化。相比显示使用互斥,std::call_once()
函数的额外开销往往更低,特别是在初始已经完成的情况下,所以如果功能符合需求就应优先使用。
延迟初始化代码如下:
std::shared_ptr<some_resource> resource_ptr;
//实例存储
std::once_flag resource_flag;
void init_resource()
{
resource_ptr.reset(new some_resource);
}
void foo()
{
//初始化函数准确地被唯一一次调用
std::call_once(resource_flag, init_resource);
resource_ptr->do_something();
}
利用std::call_once()
函数对类X的数据成员实施线程安全的延迟初始化:
class X
{
private:
connection_info connection_details;
connection_handle connection;
std::once_flag connection_init_flag;
void open_connection()
{
connection = connection_manager.open(connection_details);
}
public:
X(connection_info const& connection_details_);
connection_details(connection_details_)
{}
void send_data(data_packet const& data)
{
std::call_once(connection_init_flag, &X::open_connection, this);
connection.send_data(data);
}
data_packet receive_data()
{
std::call_once(connection_init_flag, &X::open_connection, this);
return connection.receive_data();
}
};
某些类的代码只需用到唯一一个全局实例,这种情形可用以下方法代替std::call_once()
:
class my_class;
//线程安全的初始化,C++11标准保证其正确性
my_class& get_my_class_instance()
{
static my_class inctance;
return instance;
}
多个线程可以安全地调用get_my_class_instance(),而无需担忧初始化的条件竞争。
4、小结
世上无难事,只怕有心人。