C++ STL 多线程库用法介绍

目录

一:Atomic:

二:Thread

 1. 创建线程 

2. 小心移动(std::move)线程 

 3. 如何创建带参数的线程

4. 线程参数是引用类型时,要小心谨慎。

 5. 获取线程ID

6. jthread

7. 如何在线程中使用中断 stop_token

三:如何解决数据竞争

1.有问题的代码 

2.使用互斥 

3.预防死锁

4. 自动释放锁

5. 延迟锁

6. 共享锁

7. 线程安全的初始化

四:线程局部存储

五:线程通信

1.条件变量

2. 防止虚假唤醒

3. 防止唤醒丢失

4.信号量

5. std::latch

六:任务

 1. std::promise, std::future

2. 用std::promise, std::future进行线程同步

3. std::async

4. std::package_task


一:Atomic:

#include <atomic>
#include <thread>
#include <iostream>

using namespace std;

std::atomic_int x, y;
int r1, r2;
void writeX() {
	x.store(1);
	r1 = y.load();
}
void writeY() {
	y.store(1);
	r2 = x.load();
} 

int main() {
	for (int i = 0; i < 100; i++)
	{
		x = 0;
		y = 0;
		std::thread a(writeX);
		std::thread b(writeY);
		a.join();
		b.join();
		std::cout << r1 << r2 << std::endl;
	}
	return 0;
}
//可能的输出有三种情况:01, 10, 11
//01:先执行线程a, 再执行线程b
//10:先执行线程b,再执行线程a
//11:执行线程a一半后调度到线程b,然后再回来  

二:Thread

 1. 创建线程 
#include <atomic>
#include <thread>
#include <iostream>

using namespace std;

void helloFunction() {
	cout << "function" << endl;
}


class HelloFunctionObject {
public:
	void operator()() const {
		cout << "function object" << endl;
	}
};


int main()
{
	thread t1(helloFunction); // function
	HelloFunctionObject helloFunctionObject;
	thread t2(helloFunctionObject); // function object
	thread t3([] { cout << "lambda function" << std::endl; }); // lambda function

	t1.join(); //需要用join,否则可能会出现主线程退出时,t1线程还没有执行完的情况,引起异常
	t2.join();
	t3.join();
	return 0;
}
2. 小心移动(std::move)线程 
#include <atomic>
#include <thread>
#include <iostream>

using namespace std;

int main()
{
	std::thread t([] { cout << "lambda function"; });
	std::thread t2;
	t2 = std::move(t);
	std::thread t3([] { cout << "lambda function"; });
	/*此处代码有问题,当t2 已经获得线程t后,它已经是callable和joinable,再赋值t3会terminate*/ 
	t2 = std::move(t3);  std::terminate
}
 3. 如何创建带参数的线程
#include <atomic>
#include <thread>
#include <iostream>

using namespace std;

//如何在线程中传递参数
void printStringCopy(string s) { cout << s; }
void printStringRef(const string& s) { cout << s; }

int main()
{
	string s{ "C++" };
	thread tPerCopy([=] { cout << s; }); // C++
	thread tPerCopy2(printStringCopy, s); // C++
	tPerCopy.join();
	tPerCopy2.join();
	thread tPerReference([&] { cout << s; }); // C++
	thread tPerReference2(printStringRef, s); // C++
	tPerReference.join();
	tPerReference2.join(); 
}
4. 线程参数是引用类型时,要小心谨慎。
#include <iostream>

using namespace std;

using std::this_thread::sleep_for;
using std::this_thread::get_id;

struct Sleeper {
	Sleeper(int& i_) :i{ i_ } {};
	void operator() (int k) {
		for (unsigned int j = 0; j <= 5; ++j) {
			sleep_for(std::chrono::milliseconds(100));
			i += k;
		}
		std::cout << get_id(); // undefined behaviour
	}
private:
	int& i;
};


int main()
{

	int valSleeper = 1000;
	//valSleeper 作为引用类型传给线程,如果主线程先退出,t线程使用valSleeper会产生未定义行为, 并且主线程和t线程共享varSleeper,产生数据竞争,
	std::thread t(Sleeper(valSleeper), 5); 
	t.detach();
	std::cout << valSleeper; // undefined behaviour
}
 5. 获取线程ID
using namespace std;
using std::this_thread::get_id;

int main()
{
	std::cout << std::thread::hardware_concurrency() << std::endl; // 4
	std::thread t1([] { std::cout << get_id() << std::endl; }); // 139783038650112
	std::thread t2([] { std::cout << get_id() << std::endl; }); // 139783030257408
	std::cout << t1.get_id() << std::endl; // 139783038650112
	std::cout << t2.get_id() << std::endl; // 139783030257408

	t1.swap(t2);
	std::cout << t1.get_id() << std::endl; // 139783030257408
	std::cout << t2.get_id() << std::endl; // 139783038650112
	std::cout << get_id() << std::endl; // 140159896602432
    
	t1.join();
	t2.join();
}
6. jthread
#include <atomic>
#include <thread>
#include <iostream>

using namespace std;
using std::this_thread::get_id;

//jthread 自动join()的线程
int main()
{
	std::jthread thr{ [] { std::cout << "std::jthread" << "\n"; } }; // std::jthread
	std::cout << "thr.joinable(): " << thr.joinable() << "\n"; // thr.joinable(): true
}
7. 如何在线程中使用中断 stop_token
#include <atomic>
#include <thread>
#include <iostream>

using namespace std;
using std::this_thread::get_id;
using namespace::std::literals;//字面量,比如0.2s, C++20能识别这种写法

std::jthread nonInterruptable([] { // (1)  创建非中断线程
	int counter{ 0 };
while (counter < 10) {
	std::this_thread::sleep_for(0.2s);
	std::cerr << "nonInterruptable: " << counter << std::endl;
	++counter;
}
	});
std::jthread interruptable([](std::stop_token stoken) { // (2) 创建可中断线程
	int counter{ 0 };
while (counter < 10) {
	std::this_thread::sleep_for(0.2s);
	if (stoken.stop_requested()) return; // (3) 检查线程是否被中断
	std::cerr << "interruptable: " << counter << std::endl;
	++counter;
}
	});

int main()
{
	std::this_thread::sleep_for(1s);
	std::cerr << "Main thread interrupts both jthreads" << std::endl;
	nonInterruptable.request_stop(); // (4)//请求中断,非中断线程不理会
	interruptable.request_stop();//请求中断,中断线程会响应
}

三:如何解决数据竞争

1.有问题的代码 
#include <atomic>
#include <thread>
#include <iostream>

using namespace std;

struct Worker {
	Worker(string n) :name(n) {};
	void operator() () {
		for (int i = 1; i <= 3; ++i) {
			this_thread::sleep_for(chrono::milliseconds(200));
            //流本身是线程安全的,但是cout是共享变量,它会独占流,多个线程访问cout时会引起数据竞争 
			cout << name << ": " << "Work " << i << endl;
		}
	}
private:
	string name;
};


int main()
{
	thread herb = thread(Worker("Herb"));
	thread andrei = thread(Worker(" Andrei"));
	thread scott = thread(Worker(" Scott"));
	thread bjarne = thread(Worker(" Bjarne"));

	herb.join();
	andrei.join();
	scott.join();
	bjarne.join();

}
2.使用互斥 
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>

using namespace std;

std::mutex mutexCout;

struct Worker {
	Worker(string n) :name(n) {};
	void operator() () {
		for (int i = 1; i <= 3; ++i) {
			this_thread::sleep_for(chrono::milliseconds(200));
			mutexCout.lock();
			cout << name << ": " << "Work " << i << endl;
			mutexCout.unlock();
		}
	}
private:
	string name;
};

int main()
{
	thread herb = thread(Worker("Herb"));
	thread andrei = thread(Worker("Andrei"));
	thread scott = thread(Worker("Scott"));
	thread bjarne = thread(Worker("Bjarne"));

	herb.join();
	andrei.join();
	scott.join();
	bjarne.join();

}
3.预防死锁
m.lock();
sharedVar= getVar(); //如果此处抛出异常,会导致m.unlock未调用,锁不能被释放,其他线程无法得到锁,进而可能产生死锁
m.unlock()
#include <iostream>
#include <mutex>

using namespace std;

struct CriticalData {
	std::mutex mut;
};
void deadLock(CriticalData& a, CriticalData& b) {
	a.mut.lock();
	std::cout << "get the first mutex\n";
	std::this_thread::sleep_for(std::chrono::milliseconds(1));
	b.mut.lock();
	std::cout << "get the second mutex\n";
	a.mut.unlock(), b.mut.unlock();
}

int main()
{
	CriticalData c1;
	CriticalData c2;
	//t1, t2在拿到锁后都在等对方释放锁
	std::thread t1([&] { deadLock(c1, c2); });
	std::thread t2([&] { deadLock(c2, c1); });
	t1.join();
	t2.join();
}
4. 自动释放锁
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>

using namespace std;

std::mutex mutexCout;
struct Worker {
	Worker(std::string n) :name(n) {};
	void operator() () {
		for (int i = 1; i <= 3; ++i) {
			std::this_thread::sleep_for(std::chrono::milliseconds(200));
			std::lock_guard<std::mutex> myLock(mutexCout);//自动释放锁
			std::cout << name << ": " << "Work " << i << std::endl;
		}
	}
private:
	std::string name;
};

int main()
{
	thread herb = thread(Worker("Herb"));
	thread andrei = thread(Worker("Andrei"));
	thread scott = thread(Worker("Scott"));
	thread bjarne = thread(Worker("Bjarne"));

	herb.join();
	andrei.join();
	scott.join();
	bjarne.join();
}
5. 延迟锁
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>

using namespace std;

using namespace std;
struct CriticalData {
	mutex mut;
};
void deadLockResolved(CriticalData& a, CriticalData& b) {
	unique_lock<mutex>guard1(a.mut, defer_lock);
	cout << this_thread::get_id() << ": get the first lock" << endl;
	this_thread::sleep_for(chrono::milliseconds(1));
	unique_lock<mutex>guard2(b.mut, defer_lock);
	cout << this_thread::get_id() << ": get the second lock" << endl;
	cout << this_thread::get_id() << ": atomic locking" << endl;
	lock(guard1, guard2);
}

int main()
{
	CriticalData c1;
	CriticalData c2;
	thread t1([&] { deadLockResolved(c1, c2); });
	thread t2([&] { deadLockResolved(c2, c1); });

	t1.join();
	t2.join();
}
6. 共享锁
#include <mutex>
...
std::shared_timed_mutex sharedMutex;
std::unique_lock<std::shared_timed_mutex> writerLock(sharedMutex);
std::shared_lock<std::shared_time_mutex> readerLock(sharedMutex);
std::shared_lock<std::shared_time_mutex> readerLock2(sharedMutex);
7. 线程安全的初始化
//常量表达式是线程安全的
struct MyDouble{
constexpr MyDouble(double v):val(v){};
constexpr double getValue(){ return val; }
private:
double val
};
constexpr MyDouble myDouble(10.5);
std::cout << myDouble.getValue(); // 10.5
//块内静态变量
void blockScope(){
static int MySharedDataInt= 2011;
}
//once_flag, call_once 
#include <mutex>
...
using namespace std;
once_flag onceFlag;
void do_once(){
call_once(onceFlag, []{ cout << "Only once." << endl; });
}
thread t1(do_once);
thread t2(do_once);

四:线程局部存储


std::mutex coutMutex;
thread_local std::string s("hello from ");
void addThreadLocal(std::string const& s2){
s+= s2;
std::lock_guard<std::mutex> guard(coutMutex);
std::cout << s << std::endl;
std::cout << "&s: " << &s << std::endl;
std::cout << std::endl;
}
std::thread t1(addThreadLocal, "t1");
std::thread t2(addThreadLocal, "t2");
std::thread t3(addThreadLocal, "t3");
std::thread t4(addThreadLocal, "t4");

五:线程通信

1.条件变量
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>

using namespace std;

std::mutex mutex_;
std::condition_variable condVar;
bool dataReady = false;
void doTheWork() {
	std::cout << "Processing shared data." << std::endl;
}
void waitingForWork() {
	std::cout << "Worker: Waiting for work." << std::endl;
	std::unique_lock<std::mutex> lck(mutex_);
	condVar.wait(lck, [] { return dataReady; });
	doTheWork();
	std::cout << "Work done." << std::endl;
}
void setDataReady() {
	std::lock_guard<std::mutex> lck(mutex_);
	dataReady = true;
	std::cout << "Sender: Data is ready." << std::endl;
	condVar.notify_one();
}

int main()
{
	std::thread t1(waitingForWork);
	std::thread t2(setDataReady);
	t1.join();
	t2.join();
}
2. 防止虚假唤醒
//为了防止虚假唤醒,在唤醒前应进行条件检查,且发送方应将条件置为true。
//dataReady = true; //发送方设置条件满足
//[] { return dataReady; } //接收方进行条件检查
3. 防止唤醒丢失
//如果发送方在接收方等待之前,就发送了唤醒,可能会导致唤醒丢失,因此要做两件事:
//1: 要先等待,后发送唤醒
//2: 在接收方的等待函数中要检查是否满足条件 [] { return dataReady; };
4.信号量
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <semaphore>
#include <vector>

using namespace std;

std::vector<int> myVec;

std::counting_semaphore<1> prepareSignal(0); // (1)
void prepareWork() {
	myVec.insert(myVec.end(), { 0, 1, 0, 3 });
	std::cout << "Sender: Data prepared." << '\n';
	prepareSignal.release(); // (2)
}

void completeWork() {
	std::cout << "Waiter: Waiting for data." << '\n';
	prepareSignal.acquire(); // (3)
	myVec[2] = 2;
	std::cout << "Waiter: Complete the work." << '\n';
	for (auto i : myVec) std::cout << i << " ";
	std::cout << '\n';
}


int main()
{
	std::thread t1(prepareWork);
	std::thread t2(completeWork);
	t1.join();
	t2.join();
}
5. std::latch
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <semaphore>
#include <vector>
#include <latch>

using namespace std;

std::mutex coutMutex;

std::latch workDone(2);
std::latch goHome(1); // (5)
void synchronizedOut(const std::string s) {
	std::lock_guard<std::mutex> lo(coutMutex);
	std::cout << s;
}


class Worker {
public:
	Worker(std::string n) : name(n) { };
	void operator() () {
		// notify the boss when work is done
		synchronizedOut(name + ": " + "Work done!\n");
		workDone.count_down(); // (3) 完成工作
		// waiting before going home
		goHome.wait();//等待老板发命令让他们回家
		synchronizedOut(name + ": " + "Good bye!\n");
	}
private:
	std::string name;
};



int main()
{
	std::cout << "BOSS: START WORKING! " << '\n';
	Worker herb(" Herb"); // (1) 工人1
	std::thread herbWork(herb); //工人1必须完成自己的工作
	Worker scott(" Scott"); // (2) 工人2
	std::thread scottWork(scott);//工人2必须完成自己的工作
	workDone.wait(); // (4) 完成工作后等待
	std::cout << '\n';
	goHome.count_down();//老板发命令回家
	std::cout << "BOSS: GO HOME!" << '\n';
	herbWork.join();
	scottWork.join();
}

6. std::barrier

#include <barrier>
#include <iostream>
#include <string>
#include <syncstream>
#include <thread>
#include <vector>

int main()
{
    const auto workers = { "Anil", "Busara", "Carl" };

    auto on_completion = []() noexcept
    {
        // locking not needed here
        static auto phase =
            "... done\n"
            "Cleaning up...\n";
        std::cout << phase;
        phase = "... done\n";
    };

    std::barrier sync_point(std::ssize(workers), on_completion);

    auto work = [&](std::string name)
    {
        std::string product = "  " + name + " worked\n";
        std::osyncstream(std::cout) << product;  // ok, op<< call is atomic
        sync_point.arrive_and_wait();

        product = "  " + name + " cleaned\n";
        std::osyncstream(std::cout) << product;
        sync_point.arrive_and_wait();
    };

    std::cout << "Starting...\n";
    std::vector<std::jthread> threads;
    threads.reserve(std::size(workers));
    for (auto const& worker : workers)
        threads.emplace_back(work, worker);
}

六:任务

 1. std::promise, std::future
#include <future>
#include <iostream>

void product(std::promise<int>&& intPromise, int a, int b) {
	intPromise.set_value(a * b);
}
int main()
{
	int a = 20;
	int b = 10;
	std::promise<int> prodPromise;
	std::future<int> prodResult = prodPromise.get_future();
	std::jthread prodThread(product, std::move(prodPromise), a, b);
	std::cout << "20*10= " << prodResult.get(); // 20*10= 200
}
2. 用std::promise, std::future进行线程同步
#include <future>
#include <iostream>

void doTheWork() {
	std::cout << "Processing shared data." << std::endl;
}
void waitingForWork(std::future<void>&& fut) {
	std::cout << "Worker: Waiting for work." <<
		std::endl;
	fut.wait();
	doTheWork();
	std::cout << "Work done." << std::endl;
}
void setDataReady(std::promise<void>&& prom) {
	std::cout << "Sender: Data is ready." <<
		std::endl;
	prom.set_value();
}

int main()
{
	std::promise<void> sendReady;
	auto fut = sendReady.get_future();
	std::jthread t1(waitingForWork, std::move(fut));
	std::jthread t2(setDataReady, std::move(sendReady));

}
3. std::async
#include <future>
#include <iostream>

using std::chrono::duration;
using std::chrono::system_clock;
using std::launch;

int main()
{
	auto begin = system_clock::now();
	auto asyncLazy = std::async(launch::deferred, [] { return system_clock::now(); });
	auto asyncEager = std::async(launch::async, [] { return system_clock::now(); });


	std::this_thread::sleep_for(std::chrono::seconds(1));
	auto lazyStart = asyncLazy.get() - begin;
	auto eagerStart = asyncEager.get() - begin;
	auto lazyDuration = duration<double>(lazyStart).count();
	auto eagerDuration = duration<double>(eagerStart).count();
	std::cout << lazyDuration << " sec"; // 1.00018 sec.
	std::cout << eagerDuration << " sec"; // 0.00015489 sec.
}
#include <future>
#include <iostream>
#include <thread>

using std::chrono::duration;
using std::chrono::system_clock;
using std::launch;

int main()
{
	int res;
	std::thread t([&] { res = 2000 + 11; });
	t.join();
	std::cout << res << std::endl; // 2011


	auto fut = std::async([] { return 2000 + 11; });//异步调用
	std::cout << fut.get() << std::endl; // 2011
}
4. std::package_task
#include <future>
#include <iostream>
#include <queue>
#include <thread>

using namespace std;
using std::chrono::duration;
using std::chrono::system_clock;
using std::launch;

struct SumUp {
	int operator()(int beg, int end) {
		for (int i = beg; i < end; ++i) sum += i;
		return sum;
	}
private:
	int beg;
	int end;
	int sum{ 0 };
};


int main()
{
	SumUp sumUp1, sumUp2;
	packaged_task<int(int, int)> sumTask1(sumUp1);//任务1
	packaged_task<int(int, int)> sumTask2(sumUp2);//任务2
	future<int> sum1 = sumTask1.get_future(); //任务1的结果
	future<int> sum2 = sumTask2.get_future(); //任务2的结果
	deque< packaged_task<int(int, int)>> allTasks; //存储所有的任务
	allTasks.push_back(move(sumTask1));//将任务1加入队列
	allTasks.push_back(move(sumTask2));//将任务2加入队列
	int begin{ 1 };
	int increment{ 5000 };
	int end = begin + increment;
	while (not allTasks.empty()) {
		packaged_task<int(int, int)> myTask = move(allTasks.front());//取出1个任务
		allTasks.pop_front();
		thread sumThread(move(myTask), begin, end);//执行这个任务
		begin = end;
		end += increment;
		sumThread.detach();
	}
	auto sum = sum1.get() + sum2.get();//查询任务的结果
	cout << sum;

}