1 2 3 4 5 6 7 8 9 10 11 12 13 14 | #include <thread> void func() { // do some work } int main() { std:: thread t(func); t.join(); return 0; } |
上例中,t 是一个线程对象,函数func()运行于该线程中。对join()函数的调用将使调用线程(本例是指主线程)一直处于阻塞状态,直到正在执行的线程t执行结束。如果线程函数返回某个值,该值也将被忽略。不过,该函数可以接收任意数量的参数。
1 2 3 4 5 6 7 8 9 10 11 12 | void func( int i, double d, const std::string& s) { std::cout << i << ", " << d << ", " << s << std::endl; } int main() { std:: thread t(func, 1, 12.50, "sample" ); t.join(); return 0; } |
尽管可以向线程函数传递任意数量的参数,但是所有的参数应当按值传递。如果需要将参数按引用传递,那要向下例所示那样,必须将参数用std::ref 或者std::cref进行封装。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | void func( int & a) { a++; } int main() { int a = 42; std:: thread t(func, std::ref(a)); t.join(); std::cout << a << std::endl; return 0; } |
Detach: 允许执行该方法的线程脱离其线程对象而继续独立执行。脱离后的线程不再是可结合线程(你不能等待它们执行结束)。
1 2 3 4 5 6 7 | int main() { std:: thread t(funct); t.detach(); return 0; } |
1 2 3 4 5 6 7 8 9 10 11 12 | try { std:: thread t1(func); std:: thread t2(func); t1.join(); t2.join(); } catch ( const std::exception& ex) { std::cout << ex.what() << std::endl; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | std::mutex g_mutex; std::vector<std::exception_ptr> g_exceptions; void throw_function() { throw std::exception( "something wrong happened" ); } void func() { try { throw_function(); } catch (...) { std::lock_guard<std::mutex> lock(g_mutex); g_exceptions.push_back(std::current_exception()); } } int main() { g_exceptions.clear(); std:: thread t(func); t.join(); for (auto& e : g_exceptions) { try { if (e != nullptr) { std::rethrow_exception(e); } } catch ( const std::exception& e) { std::cout << e.what() << std::endl; } } return 0; } |
在深入学习之前,有一点需要注意 <thread>头文件在命名空间std::this_thread中提供了一些帮助函数:
- get_id: 返回当前线程的id.
- yield:在处于等待状态时,可以让调度器先运行其他可用的线程。
- sleep_for:阻塞当前线程,时间不少于其参数指定的时间。
- sleep_util:在参数指定的时间到达之前,使当前线程一直处于阻塞状态。
在上面的例子中,我需要对vector g_exceptions进行同步访问,以确保在同一时间只能有一个线程向其中添加新元素。为此,我使用了互斥量,并对该互斥进行加锁。互斥量是一个核心同步原语,C++ 11的<mutex>头文件里包含了四种不同的互斥量。
- Mutex: 提供了核心函数 lock() 和 unlock(),以及非阻塞方法的try_lock()方法,一旦互斥量不可用,该方法会立即返回。
- Recursive_mutex:允许在同一个线程中对一个互斥量的多次请求。
- Timed_mutex:同上面的mutex类似,但它还有另外两个方法 try_lock_for() 和 try_lock_until(),分别用于在某个时间段里或者某个时刻到达之间获取该互斥量。
- Recursive_timed_mutex: 结合了timed_mutex 和recuseive_mutex的使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | #include <iostream> #include <thread> #include <mutex> #include <chrono> std::mutex g_lock; void func() { g_lock.lock(); std::cout << "entered thread " << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::seconds( rand () % 10)); std::cout << "leaving thread " << std::this_thread::get_id() << std::endl; g_lock.unlock(); } int main() { srand ((unsigned int ) time (0)); std:: thread t1(func); std:: thread t2(func); std:: thread t3(func); t1.join(); t2.join(); t3.join(); return 0; } |
1 2 3 4 5 6 | entered thread 10144 leaving thread 10144 entered thread 4188 leaving thread 4188 entered thread 3424 leaving thread 3424 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | template < typename T> class container { std::mutex _lock; std::vector<T> _elements; public : void add(T element) { _lock.lock(); _elements.push_back(element); _lock.unlock(); } void addrange( int num, ...) { va_list arguments; va_start (arguments, num); for ( int i = 0; i < num; i++) { _lock.lock(); add( va_arg (arguments, T)); _lock.unlock(); } va_end (arguments); } void dump() { _lock.lock(); for (auto e : _elements) std::cout << e << std::endl; _lock.unlock(); } }; void func(container< int >& cont) { cont.addrange(3, rand (), rand (), rand ()); } int main() { srand ((unsigned int ) time (0)); container< int > cont; std:: thread t1(func, std::ref(cont)); std:: thread t2(func, std::ref(cont)); std:: thread t3(func, std::ref(cont)); t1.join(); t2.join(); t3.join(); cont.dump(); return 0; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | template < typename T> class container { std::mutex _lock; std::vector<T> _elements; public : void add(T element) { _lock.lock(); _elements.push_back(element); _lock.unlock(); } void addrange( int num, ...) { va_list arguments; va_start (arguments, num); for ( int i = 0; i < num; i++) { _lock.lock(); add( va_arg (arguments, T)); _lock.unlock(); } va_end (arguments); } void dump() { _lock.lock(); for (auto e : _elements) std::cout << e << std::endl; _lock.unlock(); } }; void func(container< int >& cont) { cont.addrange(3, rand (), rand (), rand ()); } int main() { srand ((unsigned int ) time (0)); container< int > cont; std:: thread t1(func, std::ref(cont)); std:: thread t2(func, std::ref(cont)); std:: thread t3(func, std::ref(cont)); t1.join(); t2.join(); t3.join(); cont.dump(); return 0; } |
1 2 3 4 5 6 7 8 9 | 6334 18467 41 6334 18467 41 6334 18467 41 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | template < typename T> class container { std::recursive_mutex _lock; std::vector<T> _elements; public : void add(T element) { std::lock_guard<std::recursive_mutex> locker(_lock); _elements.push_back(element); } void addrange( int num, ...) { va_list arguments; va_start (arguments, num); for ( int i = 0; i < num; i++) { std::lock_guard<std::recursive_mutex> locker(_lock); add( va_arg (arguments, T)); } va_end (arguments); } void dump() { std::lock_guard<std::recursive_mutex> locker(_lock); for (auto e : _elements) std::cout << e << std::endl; } }; |
‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)’ : cannot convert parameter 1 from ‘const std::recursive_mutex’ to ‘std::recursive_mutex &’
1 2 3 4 5 6 7 8 9 10 11 12 13 | template < typename T> class container { mutable std::recursive_mutex _lock; std::vector<T> _elements; public : void dump() const { std::lock_guard<std::recursive_mutex> locker(_lock); for (auto e : _elements) std::cout << e << std::endl; } }; |
- defer_lock of type defer_lock_t:不获取互斥量的拥有权
- try_to_lock of type try_to_lock_t:在不阻塞的情况下试图获取互斥量的拥有权
- adopte_lock of type adopt_lock_t:假设调用线程已经拥有互斥量的所有权
1 2 3 4 5 6 7 | struct defer_lock_t { }; struct try_to_lock_t { }; struct adopt_lock_t { }; constexpr std::defer_lock_t defer_lock = std::defer_lock_t(); constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t(); constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t(); |
- lock:使用一种可以避免死锁的算法对互斥量加锁(通过调用lock(),try_lock()和unlock()).
- try_lock():按照互斥量被指定的顺序,试着通过调用try_lock()来对多个互斥量加锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | template < typename T> class container { public : std::mutex _lock; std::set<T> _elements; void add(T element) { _elements.insert(element); } void remove (T element) { _elements.erase(element); } }; void exchange(container< int >& cont1, container< int >& cont2, int value) { cont1._lock.lock(); std::this_thread::sleep_for(std::chrono::seconds(1)); // <-- forces context switch to simulate the deadlock cont2._lock.lock(); cont1. remove (value); cont2.add(value); cont1._lock.unlock(); cont2._lock.unlock(); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | int main() { srand ((unsigned int ) time (NULL)); container< int > cont1; cont1.add(1); cont1.add(2); cont1.add(3); container< int > cont2; cont2.add(4); cont2.add(5); cont2.add(6); std:: thread t1(exchange, std::ref(cont1), std::ref(cont2), 3); std:: thread t2(exchange, std::ref(cont2), std::ref(cont1), 6) t1.join(); t2.join(); return 0; } |
1 2 3 4 5 6 7 8 9 10 | void exchange(container< int >& cont1, container< int >& cont2, int value) { std::lock(cont1._lock, cont2._lock); cont1. remove (value); cont2.add(value); cont1._lock.unlock(); cont2._lock.unlock(); } |
条件变量C++11 还提供了另外一种同步原语,就是条件变量,它能使一个或多个线程进入阻塞状态,直到接到另一个线程的通知,或者发生超时或虚假唤醒时,才退出阻塞.在头文件<condition_variable> 里对条件变量有两种实现:
下面来讲讲条件变量的工作原理: 至少有一个线程在等待某个条件变为true。等待的线程必须先获取unique_lock 锁。该锁被传递给wait()方法,wait()方法会释放互斥量,并将线程挂起,直到条件变量接收到信号。收到信号后,线程会被唤醒,同时该锁也会被重新获取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | #include <thread> #include <mutex> #include <condition_variable> #include <iostream> #include <queue> #include <random> std::mutex g_lockprint; std::mutex g_lockqueue; std::condition_variable g_queuecheck; std::queue< int > g_codes; bool g_done; bool g_notified; void workerfunc( int id, std::mt19937& generator) { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\trunning..." << std::endl; } // simulate work std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl; } // notify error to be logged { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_notified = true ; g_queuecheck.notify_one(); } } void loggerfunc() { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\trunning..." << std::endl; } // loop until end is signaled while (!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); while (!g_notified) // used to avoid spurious wakeups { g_queuecheck.wait(locker); } // if there are error codes in the queue process them while (!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl; g_codes.pop(); } g_notified = false ; } } int main() { // initialize a random generator std::mt19937 generator((unsigned int )std::chrono::system_clock::now().time_since_epoch().count()); // start the logger std:: thread loggerthread(loggerfunc); // start the working threads std::vector<std:: thread > threads; for ( int i = 0; i < 5; ++i) { threads.push_back(std:: thread (workerfunc, i+1, std::ref(generator))); } // work for the workers to finish for (auto& t : threads) t.join(); // notify the logger to finish and wait for it g_done = true ; loggerthread.join(); return 0; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | [logger] running... [worker 1] running... [worker 2] running... [worker 3] running... [worker 4] running... [worker 5] running... [worker 1] an error occurred: 101 [worker 2] an error occurred: 201 [logger] processing error: 101 [logger] processing error: 201 [worker 5] an error occurred: 501 [logger] processing error: 501 [worker 3] an error occurred: 301 [worker 4] an error occurred: 401 [logger] processing error: 301 [logger] processing error: 401 |
- 第一个重载带有锁unique_lock;这个重载方法可以释放锁,阻塞线程,并把线程添加到正在等待这一条件变量的线程队列里面。当该条件变量收到信号或者发生虚假唤醒时,线程就会被唤醒。它们其中任何一个发生时,锁都会被重新获取,函数返回。
- 第二个重载除了带有锁unique_lock外,还带有循环判定直到返回false值;这个重载是用来避免发生虚假唤醒。它基本上等价于下面的语句:
1 2 | while (!predicate()) wait(lock); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | void workerfunc( int id, std::mt19937& generator) { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\trunning..." << std::endl; } // simulate work std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl; } // notify error to be logged { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_queuecheck.notify_one(); } } void loggerfunc() { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\trunning..." << std::endl; } // loop until end is signaled while (!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); g_queuecheck.wait(locker, [&](){ return !g_codes.empty();}); // if there are error codes in the queue process them while (!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl; g_codes.pop(); } } } |
- Wait_for: 在条件变量收到信号或者指定的超时发生前,线程一直处于阻塞状态;
- Wait_until:在条件变量收到信号或者指定的时刻到达之前,线程一直处于阻塞状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | std::mutex g_lockprint; std::mutex g_lock; std::condition_variable g_signal; bool g_done; void workerfunc(std::mt19937& generator) { { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker running..." << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker finished..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); g_done = true ; std::notify_all_at_thread_exit(g_signal, std::move(lock)); } int main() { // initialize a random generator std::mt19937 generator((unsigned int )std::chrono::system_clock::now().time_since_epoch().count()); std::cout << "main running..." << std::endl; std:: thread worker(workerfunc, std::ref(generator)); worker.detach(); std::cout << "main crunching..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "main waiting for worker..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); while (!g_done) // avoid spurious wake-ups g_signal.wait(lock); std::cout << "main finished..." << std::endl; return 0; } |
1 2 3 4 5 6 | main running... worker running... main crunching... worker finished... main waiting for worker... main finished... |
1 2 3 4 5 6 | main running... worker running... main crunching... main waiting for worker... worker finished... main finished... |