您现在的位置是:首页 >学无止境 >C++并发线程 - 如何管控线程【启动/暂停/停止/恢复】网站首页学无止境

C++并发线程 - 如何管控线程【启动/暂停/停止/恢复】

Allen.Su 2024-07-24 12:01:02
简介C++并发线程 - 如何管控线程【启动/暂停/停止/恢复】

系列文章目录

C++高性能优化编程系列
深入理解软件架构设计系列
高级C++并发线程系列
深入理解设计模式系列

超越昨天的自己 Keeps going beyond yesterday's own

1、线程最基本的使用 - 简单管控

如果实现并发,而且确定采用多线程技术。从何入手?怎么启动线程?怎么查验它们是否已经结束?怎么检测其运行状态?
接下来,我来介绍下:发起线程,然后等待它结束,或让它在后台运行。接着启动时向线程传递参数,以及如何把线程的归属权从某个std::thread对象转移给另一个。以及怎样识别一个线程。

1.1 发起线程
C++程序都含有至少一个线程,即运行 main()的线程,它由C++运行时系统启动。
发起线程两种方式:
(1) 通过将函数传递给std::thread的:

void func() {
  // do something
}
std::thread t(func);

(2) 采用lambda表达式匿名方式:

std::thread t([](){
  // do something
});

(3) 类成员函数:

class MyClass {
public:
  void func() {
    // do something
  }
};
MyClass;
std::thread t(&MyClass::func, &obj);

(4)使用std::bind指定函数

void func(int arg1, int arg2) {
  // do something
}
std::thread t(std::bind(func, 1, 2));

一旦启动了线程,我们就需要明确是要等待它结束(join,与之会合),还是人有它独自运行(detach,与之分离)。

假定程序不等待线程结束,那么在线程运行结束之前,我们需保证它所访问的外部数据始终正确、有效。我们编写程序时不注意就会遇到的bug,不易查找。这并非是新的问题,在单线程代码中,试图访问已销毁的对象同样是未定义的行为。不过,因为使用多线程,所以我们可能经常面临这种生存期的问题。

我们看下面代码,当前线程的函数已返回,新线程却仍访问其局部变量,大错特错

void myfunc(std::vector<int>& a)
{
	 for (int j = 0; j < 5; ++j) {
        int b = 2 * a.at(0);  // 1、隐患:可能访问悬空引用
    }
};
void oops()
{
	std::vector<int> a{1};
	std::thread t(myfunc, ref(a));
	t.detach(); //2、不等待线程结束
} //3、新线程可能仍在运行,而主线程的函数却已结束

解决的两种方式:
(1)令线程函数完全自含,将数据复制到新线程内部,而不是共享数据;
(2)会合新线程,等待主线程的函数退出之前,新线程执行完毕。

1.2 等待线程完成

等待线程结束,一般调用成员函数join()实现,简单粗暴。如何查险线程结束与否,或限定只等待一段时间,那我们得该用其他方式,如条件变量和future。

join()仅能调用一次;只要std::thread对象曾经调用过join(),线程就不再可回合(joinnable),成员函数joinnable()将返回false。

看以下代码,利用RAII过程等待线程完结:

class thread_guard {
    std::thread& t;
public:
    explicit thread_guard(std::thread& t_):t(t_)
    {}
    ~thread_guard(){
        if(t.joinable()){
            t.join();
        }
    }
    thread_guard(thread_guard const&) = delete;
    thread_guard& operator = (thread_guard const&) = delete;

};

void oops(){
    std::vector<int> a{1};
    std::thread t(myfunc, ref(a));
    thread_guard g(t);
    t.detach(); 
}

2、如何将参数传递给线程

线程具有内部存储空间,参数会按照默认方式先复制到该处,新创建的线程才能直接访问它们。

//正确
void f(int i, std::string const& s);
std::thread t(f, 3, "hello"); //以char const* 传入

//可能会有局部数组销毁,造成未定义的行为
void opps(int some_param)
{
	char buffer[1024];
	std::thread t(f, 3, buffer); //传递buffer指针,会有局部数组销毁,造成未定义的行为
	t.detach();
}
//使用std::string避免悬空指针
void opps(int some_param)
{
	char buffer[1024];
	std::thread t(f, 3, std::string(buffer)); //使用std::string避免悬空指针
	t.detach();
}

若需要用引用方式传递参数,只要用std::ref()函数加以包装即可

下面是错误的例子

void update(w_data& data);
void oops()
{
	w_data data;
	std::thread t(data); //编译失败,线程的构造函数会将data对象复制,临时的副本,以右值的方式传入,但是update预期接受非const引用
}

若需按引用的方式传递参数,只要用std::ref()函数加以包装即可,把创建线程语句改写成:

void oops()
{
	w_data data;
	std::thread t(std::ref(data)); 
}

移动方式传参数,转移对象的归属权,典型std::unique_ptr :

void process_big_object(std::unique_ptr<big_object>);
std::unique_ptr p(new big_object);
p->preare_data(42);
std::thread t(process_big_object, std::move(p));

其实std::thread和std::unique_ptr在C++11里一样只能移动不能复制。

3、线程归属权居然是可以转移的

如下代码,移交线程归属权

void some_function();
void some_other_function();
std::thread t1(some_function);
std::thread t2 = std::move(t1);
t1 = std::thread(some_other_function);
std::thread t3;
t3 = std::move(t2);
t1 = std::move(t3); //该赋值操作会终止程序整个程序,t1的线程正在启动

必须明确只要std::thread对象正在管控着一个线程,就不能简单地向它赋新值

从函数内部返回std::thread对象

std::thread f()
{
	void some_function();
	return std::thread(some_function);
}
std::thread g()
{
	void some_other_function(int);
	std::thread t(some_other_function, 42);
	return t
}

类似的,归属权可以转移到函数内部,函数就能够接收到std::thread实例作为按右值传递的参数,如下所示:

void f(std::thread t);
void g()
{
	void some_function();
	f(std::thread(some_function));
	std::thread t(some_function);
	f(std::move(t));
}

scoped_thread类及其用例

class scoped_thread {
    std::thread t;
public:
    explicit scoped_thread(std::thread t_):t(std::move(t_))
    {
     	if(!t.joinable()){
            throw std::logic_error("No thread");
        }
    }
    ~scoped_thread(){
        t.join();
    }
    scoped_thread(thread_guard const&) = delete;
    scoped_thread& operator = (thread_guard const&) = delete;

};

void oops(){
    std::vector<int> a{1};
    scoped_thread t(std::thread(func(a)));
}

使用std::vector装载,生成多个线程,并等待它们运行完成
我们把std::thread装进std::vector容器内,向线程管控的自动化迈进了一步,若要为多个线程分别直接创建独立变量,还不如将它们集结成组,统一处理。

void do_work(unsigned id);
void f()
{
	std::vector<std::thread> threads;
	for(unsigned i = 0; i < 20; i++)
	{
		threads.emplace_back(do_work, i);//生成线程
	}
	for(auto& entry: threads)
	{
    	entry.join(); //依次在各线程上调用join函数
    }
}

4、通过什么识别线程 - 一般算法常用

获取线程ID 方式1:通过t1.get_id()

std::thread t1(foo);
std::thread::id t1_id = t1.get_id();
std::cout << "t1 ID: " << t1_id << std::endl;
if (t1.joinable())
 {
       t1.join();
}

获取线程ID 方式2: 通过std::this_thread::get_id()

void thread_function()
{
    std::cout << "Thread ID is: " << std::this_thread::get_id() << std::endl;
}
 
int main()
{
    std::thread t1(thread_function);
 
    if (t1.joinable())
    {
        t1.join();
    }
     
    return 0;
}

5、控制线程操作 - 启动/暂停/停止/恢复

#include <future>
#include <thread>
using namespace std;
 
class MyThread
{
public:
	MyThread() = default;
	~MyThread() = default;
 
public:
    void RunThread();  //运行线程
    void StopThread(); //停止线程
    void PauseThread(); //暂停线程
    void ResumeThread();//恢复线程
private:
    condition_variable cv_;
    mutex mutex_;
    bool isStop_{false};
    bool isPause_{ false };
};
void MyThread::RunThread()
{
    thread t([&] {
        while (true)
        {
            if (!m_isStop)
            {
                std::cout << "Thread ID is: " << std::this_thread::get_id() << std::endl;
                this_thread::sleep_for(chrono::seconds(1));
                unique_lock<mutex> lock(mutex_);
                cv_.wait(lock, [this] {return !isPause_; });
            }
        }
        });
    t.detach();
}
void MyThread::StopThread()
{
    isStop_ = true;
}
void MyThread::PauseThread()
{
    unique_lock<mutex> lock(mutex_);
    isPause_ = true;
    cv_.notify_one();
}
void MyThread::ResumeThread()
{
    unique_lock<mutex> lock(mutex_);
    isPause_ = false;
    cv_.notify_one();
}
 
int main()
{
    MyThread mythread;
    mythread.RunThread();
    int ch;
    while ((ch = getchar()) != 'q')
    {
        switch (ch)
        {
        case 'p':
            mythread.PauseThread();
            break;
        case 'r':
            mythread.ResumeThread();
            break;
        case 's':
            mythread.StopThread();
            break;
        default:
            break;
        }
    }
    return 0;
}

6、小结

std::thread::hardware_concurrency 是一个 C++11 标准库函数,它返回当前系统支持的最大线程数量。函数返回值的是 unsigned int 类型的值。使用 std::thread::hardware_concurrency 能确定在某个系统上可以创建多少线程,从而优化多线程程序的性能。

但是,返回值并不一定是真正的系统最大线程数,而是一个估计值。在实际使用时应该对其进行测试和优化。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。