C++ 现代并发编程核心知识点总结 本文档总结了 C++11 至 C++20 中关于多线程管理、同步机制、互斥锁及条件变量的最佳实践与核心概念。
1. 线程管理 (std::thread) 1.1 join 与 detach 的区别
特性
join() (汇合)
detach() (分离)
阻塞性
阻塞 。主线程等待子线程结束。
非阻塞 。主线程继续运行,子线程后台独立运行。
资源回收
join 返回时回收资源。
子线程结束时由系统自动回收。
安全性
安全,保证子线程任务完成。
危险,需确保子线程访问的外部引用生命周期有效。
必选规则
std::thread 析构前必须调用两者之一,否则程序 Crash (std::terminate)。
场景1:使用 join() 进行数据处理 场景 :主线程需要等待子线程完成数据计算后再使用结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #include <iostream> #include <thread> #include <vector> void calculate_sum (const std::vector<int >& data, int & result) { result = 0 ; for (int num : data) { result += num; } } int main () { std::vector<int > data = {1 , 2 , 3 , 4 , 5 }; int result = 0 ; std::thread worker (calculate_sum, std::ref(data), std::ref(result)) ; worker.join (); std::cout << "Sum: " << result << std::endl; return 0 ; }
场景2:使用 detach() 进行日志记录 场景 :后台日志写入不阻塞主业务逻辑,但要确保主程序不立即退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <iostream> #include <thread> #include <chrono> #include <fstream> class Logger {public : static void log_async (std::string message) { std::thread ([msg = std::move (message)]() { std::ofstream file ("app.log" , std::ios::app); file << msg << std::endl; }).detach (); } }; int main () { Logger::log_async ("Application started" ); std::cout << "Doing work..." << std::endl; std::this_thread::sleep_for (std::chrono::seconds (1 )); return 0 ; }
1.2 关键误区:Detach 与 进程生命周期
问题 :使用了 detach() 且子线程是死循环,为什么主线程 main 返回后子线程也停止了?
原因 :main 函数返回意味着 进程(Process)结束 。
机制 :操作系统会回收进程的所有资源,强制杀死该进程下属的所有线程(无论是否 detach)。
结论 :detach() 只是切断 std::thread 对象与后台线程的联系,并没有赋予线程超越进程生命周期的能力。要让子线程一直运行,主线程必须保持存活(如使用 while(true) 或 join)。
2. 互斥与锁管理 (RAII) 原则 :永远不要手动调用 .lock() 和 .unlock(),必须使用 RAII 包装器管理锁的生命周期。
2.1 互斥量 (Mutex)
std::mutex (C++11):最通用、最轻量。
std::shared_mutex (C++17):读写锁 。适用于“读多写少”场景。
读操作:共享(多个线程可同时读)。
写操作:独占(写时禁止任何读写)。
场景:使用 std::shared_mutex 实现缓存系统 场景 :多线程读取缓存(高频),偶尔更新缓存(低频)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 #include <iostream> #include <thread> #include <shared_mutex> #include <unordered_map> #include <string> #include <vector> class Cache { std::unordered_map<std::string, std::string> data_; mutable std::shared_mutex mutex_; public : std::string get (const std::string& key) const { std::shared_lock lock (mutex_) ; auto it = data_.find (key); return it != data_.end () ? it->second : "" ; } void set (const std::string& key, const std::string& value) { std::unique_lock lock (mutex_) ; data_[key] = value; } }; int main () { Cache cache; cache.set ("user:1" , "Alice" ); std::vector<std::thread> readers; for (int i = 0 ; i < 10 ; ++i) { readers.emplace_back ([&cache]() { std::cout << "Read: " << cache.get ("user:1" ) << std::endl; }); } for (auto & t : readers) { t.join (); } return 0 ; }
2.2 锁管理器 (Lock Guards) - 推荐做法
C++ 版本
工具
描述
C++17 (首选)
std::scoped_lock
最推荐 。支持同时锁定多个 mutex(防死锁),作用域结束自动解锁。
C++11/14
std::unique_lock
灵活。允许中途手动解锁/加锁,配合 condition_variable 必须使用它。
C++17
std::shared_lock
配合 shared_mutex 使用,获取“读锁”(共享权)。
C++11
std::lock_guard
基础款,功能单一(构造加锁,析构解锁)。
场景1:std::scoped_lock 防死锁的银行转账 场景 :两个线程同时在两个账户间转账,避免死锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include <iostream> #include <thread> #include <mutex> class BankAccount { int balance_; std::mutex mutex_; public : BankAccount (int balance) : balance_ (balance) {} friend void transfer (BankAccount& from, BankAccount& to, int amount) { std::scoped_lock lock (from.mutex_, to.mutex_) ; if (from.balance_ >= amount) { from.balance_ -= amount; to.balance_ += amount; std::cout << "Transferred " << amount << std::endl; } } int balance () const { return balance_; } }; int main () { BankAccount alice (1000 ) , bob (500 ) ; std::thread t1 ([&]() { transfer(alice, bob, 200 ); }) ; std::thread t2 ([&]() { transfer(bob, alice, 100 ); }) ; t1.join (); t2.join (); std::cout << "Alice: " << alice.balance () << ", Bob: " << bob.balance () << std::endl; return 0 ; }
场景2:std::unique_lock 配合条件变量的生产者-消费者 场景 :需要在 wait 时释放锁,其他线程才能访问。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> class MessageQueue { std::queue<int > queue_; std::mutex mutex_; std::condition_variable cv_; public : void produce (int value) { std::unique_lock lock (mutex_) ; queue_.push (value); cv_.notify_one (); } int consume () { std::unique_lock lock (mutex_) ; cv_.wait (lock, [this ]{ return !queue_.empty (); }); int value = queue_.front (); queue_.pop (); return value; } }; int main () { MessageQueue mq; std::thread producer ([&]() { for (int i = 0 ; i < 5 ; ++i) { mq.produce(i); std::cout << "Produced: " << i << std::endl; } }) ; std::thread consumer ([&]() { for (int i = 0 ; i < 5 ; ++i) { int value = mq.consume(); std::cout << "Consumed: " << value << std::endl; } }) ; producer.join (); consumer.join (); return 0 ; }
3. 同步原语对比 3.1 工具选型指南
场景
推荐工具
C++ 版本
简单计数/Flag
std::atomic<T>
C++11
等待 N 个线程初始化
std::latch
C++20
阶段性同步 (屏障)
std::barrier
C++20
资源计数/限流
std::counting_semaphore
C++20
复杂逻辑等待
std::condition_variable
C++11
获取异步返回值
std::future / std::async
C++11
3.2 深度解析:std::latch (C++20)
定义 :一次性倒数计数器(门闩)。
特点 :
无需 Mutex,轻量高效。
一次性 :计数减到 0 后不可复用。
典型用法 :主线程等待 K 个 Worker 线程初始化完毕。
Worker: latch.count_down()
Main: latch.wait()
场景:分布式任务启动协调器 场景 :主线程等待 5 个微服务全部启动完成后才开始接受请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 #include <iostream> #include <thread> #include <latch> #include <chrono> #include <vector> class ServiceManager { std::latch startup_latch_; public : ServiceManager (int service_count) : startup_latch_ (service_count) {} void start_service (const std::string& name) { std::thread ([this , name]() { std::this_thread::sleep_for (std::chrono::milliseconds (100 * (rand () % 5 ))); std::cout << name << " started!" << std::endl; startup_latch_.count_down (); }).detach (); } void wait_all_ready () { std::cout << "Waiting for all services..." << std::endl; startup_latch_.wait (); std::cout << "All services ready! Accepting requests." << std::endl; } }; int main () { ServiceManager manager (5 ) ; manager.start_service ("Database" ); manager.start_service ("Cache" ); manager.start_service ("MessageQueue" ); manager.start_service ("Auth" ); manager.start_service ("Logger" ); manager.wait_all_ready (); std::this_thread::sleep_for (std::chrono::seconds (2 )); return 0 ; }
3.3 深度解析:信号量 vs 条件变量
**std::counting_semaphore (有记忆)**:本质是计数器。生产者 release() 会增加计数,即使消费者没在等,信号也会保留(快递放门口)。
**std::condition_variable (无记忆)**:本质是通知。生产者 notify() 时如果消费者没在 wait(),信号就丢失了(按门铃没听见)。
场景1:std::counting_semaphore 实现连接池 场景 :限制最多 3 个并发数据库连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include <iostream> #include <thread> #include <semaphore> #include <chrono> #include <vector> class ConnectionPool { std::counting_semaphore<3 > semaphore_{3 }; public : void execute_query (int task_id) { semaphore_.acquire (); std::cout << "Task " << task_id << " acquired connection" << std::endl; std::this_thread::sleep_for (std::chrono::seconds (1 )); semaphore_.release (); std::cout << "Task " << task_id << " released connection" << std::endl; } }; int main () { ConnectionPool pool; std::vector<std::thread> tasks; for (int i = 0 ; i < 10 ; ++i) { tasks.emplace_back ([&pool, i]() { pool.execute_query (i); }); } for (auto & t : tasks) { t.join (); } return 0 ; }
场景2:std::barrier 实现多阶段并行计算 场景 :MapReduce 需要所有 Map 任务完成后才能开始 Reduce。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 #include <iostream> #include <thread> #include <barrier> #include <vector> class MapReduce { std::barrier<> sync_point_; std::vector<int > results_; public : MapReduce (int worker_count) : sync_point_ (worker_count), results_ (worker_count) {} void run_worker (int id) { results_[id] = id * 2 ; std::cout << "Worker " << id << " finished Map" << std::endl; sync_point_.arrive_and_wait (); int sum = 0 ; for (int val : results_) sum += val; std::cout << "Worker " << id << " computed total: " << sum << std::endl; } }; int main () { MapReduce mr (4 ) ; std::vector<std::thread> workers; for (int i = 0 ; i < 4 ; ++i) { workers.emplace_back ([&mr, i]() { mr.run_worker (i); }); } for (auto & t : workers) { t.join (); } return 0 ; }
4. 条件变量详解 (std::condition_variable) 4.1 wait 谓词逻辑 (Predicate) 代码范式:
1 cv.wait (lock, []{ return 条件; });
等价于:
1 2 3 while (!条件) { cv.wait (lock); }
为什么需要谓词 :防止虚假唤醒(spurious wakeup)。
场景:线程安全的事件通知系统 场景 :UI 线程等待后台任务完成后更新界面。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <chrono> class TaskExecutor { std::mutex mutex_; std::condition_variable cv_; bool task_completed_ = false ; std::string result_; public : void execute_task () { std::thread ([this ]() { std::this_thread::sleep_for (std::chrono::seconds (2 )); { std::unique_lock lock (mutex_); result_ = "Task result" ; task_completed_ = true ; } cv_.notify_one (); }).detach (); } std::string wait_for_result () { std::unique_lock lock (mutex_) ; cv_.wait (lock, [this ]{ return task_completed_; }); return result_; } }; int main () { TaskExecutor executor; std::cout << "Starting task..." << std::endl; executor.execute_task (); std::cout << "Waiting for result..." << std::endl; std::string result = executor.wait_for_result (); std::cout << "Got result: " << result << std::endl; std::this_thread::sleep_for (std::chrono::seconds (1 )); return 0 ; }
4.2 wait_for 与 wait_until - 超时等待 区别 :
wait_for(lock, duration, predicate):等待相对时间
wait_until(lock, time_point, predicate):等待绝对时间
场景:带超时的请求处理 场景 :等待响应最多 5 秒,超时返回错误。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <chrono> class ResponseWaiter { std::mutex mutex_; std::condition_variable cv_; bool response_received_ = false ; public : void receive_response () { std::this_thread::sleep_for (std::chrono::seconds (3 )); std::unique_lock lock (mutex_) ; response_received_ = true ; cv_.notify_one (); } bool wait_with_timeout () { std::unique_lock lock (mutex_) ; bool success = cv_.wait_for (lock, std::chrono::seconds (5 ), [this ]{ return response_received_; }); return success; } }; int main () { ResponseWaiter waiter; std::thread responder ([&]() { waiter.receive_response(); }) ; if (waiter.wait_with_timeout ()) { std::cout << "Response received in time!" << std::endl; } else { std::cout << "Timeout: No response" << std::endl; } responder.join (); return 0 ; }
4.3 为什么 wait 必须使用 unique_lock? std::condition_variable::wait 函数要求传入 std::unique_lock<std::mutex>,而不能是 std::lock_guard,原因如下:
原子解锁与挂起 : 当线程调用 wait 时,它必须释放锁 并进入睡眠状态。这一系列操作必须是原子的(或者至少在逻辑上是安全的),以避免“丢失唤醒”问题(即在释放锁和进入睡眠之间,另一个线程发送了通知)。
重新加锁 : 当线程被唤醒(无论是被 notify 还是虚假唤醒)时,它在 wait 函数返回之前,必须重新获取锁 ,以保证后续对共享数据的访问是线程安全的。
接口支持 :
std::lock_guard 是纯粹的 RAII 包装器,构造时加锁,析构时解锁,没有提供手动 lock() 和 unlock() 接口 。
std::unique_lock 提供了 lock()、unlock() 等成员函数,允许 condition_variable 在内部灵活地控制锁的状态(先解锁 -> 等待 -> 唤醒后加锁)。
总结 :condition_variable 需要在等待期间暂时释放锁,并在唤醒后重新持有锁,只有 unique_lock 支持这种灵活的锁操作。
4.4 常见错误:std::vector<std::condition_variable> 问题 :尝试定义 std::vector<std::condition_variable> 会导致编译错误。
原因 :std::condition_variable 是 不可复制(Non-copyable) 且 不可移动(Non-movable) 的对象。
std::vector 在扩容(resize)时需要将旧元素移动或复制到新内存块中。
既然 condition_variable 无法移动或复制,它就不能直接作为 std::vector 的元素。
解决方案 :
**使用 std::deque 或 std::list**: 如果不需要连续内存,可以使用 std::list,它不会在添加元素时移动现有元素。
1 2 std::list<std::condition_variable> cv_list; cv_list.emplace_back ();
使用智能指针 (推荐): 存储 std::unique_ptr<std::condition_variable>。指针本身是可移动的,完全符合 std::vector 的要求。
1 2 3 std::vector<std::unique_ptr<std::condition_variable>> cv_vec; cv_vec.push_back (std::make_unique <std::condition_variable>());
5. 原子操作与无锁编程 (std::atomic) 5.1 原子操作的优势
无锁 :不需要 mutex,性能更高。
适用场景 :简单计数、标志位、状态机。
局限性 :只适用于简单数据类型,复杂逻辑仍需 mutex。
场景1:线程安全的计数器 场景 :多线程统计请求数量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #include <iostream> #include <thread> #include <atomic> #include <vector> class RequestCounter { std::atomic<int > count_{0 }; public : void increment () { count_.fetch_add (1 , std::memory_order_relaxed); } int get_count () const { return count_.load (std::memory_order_relaxed); } }; int main () { RequestCounter counter; std::vector<std::thread> threads; for (int i = 0 ; i < 10 ; ++i) { threads.emplace_back ([&counter]() { for (int j = 0 ; j < 1000 ; ++j) { counter.increment (); } }); } for (auto & t : threads) { t.join (); } std::cout << "Total requests: " << counter.get_count () << std::endl; return 0 ; }
场景2:线程安全的单例模式(Double-Checked Locking) 场景 :延迟初始化单例对象,避免每次都加锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include <iostream> #include <atomic> #include <mutex> class Singleton { static std::atomic<Singleton*> instance_; static std::mutex mutex_; Singleton () { std::cout << "Singleton constructed" << std::endl; } public : static Singleton* get_instance () { Singleton* tmp = instance_.load (std::memory_order_acquire); if (tmp == nullptr ) { std::lock_guard lock (mutex_) ; tmp = instance_.load (std::memory_order_relaxed); if (tmp == nullptr ) { tmp = new Singleton (); instance_.store (tmp, std::memory_order_release); } } return tmp; } }; std::atomic<Singleton*> Singleton::instance_{nullptr }; std::mutex Singleton::mutex_; int main () { Singleton* s1 = Singleton::get_instance (); Singleton* s2 = Singleton::get_instance (); std::cout << "Same instance: " << (s1 == s2) << std::endl; return 0 ; }
6. std::future 与 std::async - 异步编程全解 6.1 核心概念
**std::async**:启动异步任务,返回 std::future。
**std::future**:代表未来的结果,调用 .get() 获取(阻塞)。
**std::promise**:主动设置异步结果的”承诺”。
**std::packaged_task**:可调用对象的包装器,自动关联 future。
6.2 异步工具对比表
工具
用途
特点
适用场景
std::async
快速启动异步任务
自动管理线程,简单易用
独立计算任务
std::promise
手动设置结果
灵活控制时机,支持跨线程通信
事件驱动、回调
std::packaged_task
包装函数对象
可延迟执行,适配线程池
任务队列系统
std::future
获取异步结果
只能 get() 一次,移动语义
所有异步场景
std::shared_future
多线程共享结果
可多次 get(),拷贝语义
广播结果
6.3 std::async 详解 启动策略对比 1 2 3 4 5 6 7 8 auto fut1 = std::async (std::launch::async, task);auto fut2 = std::async (std::launch::deferred, task);auto fut3 = std::async (task);
关键差异 :
async:真正的并行,立即占用线程资源。
deferred:串行执行,节省资源但无并行效果。
默认策略:可能导致不确定行为,生产环境建议显式指定 。
场景1:并行计算多个HTTP请求 场景 :同时发起多个网络请求,等待所有结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 #include <iostream> #include <future> #include <vector> #include <chrono> #include <string> std::string fetch_url (const std::string& url) { std::this_thread::sleep_for (std::chrono::seconds (1 )); return "Data from " + url; } int main () { std::vector<std::string> urls = { "api.example.com/users" , "api.example.com/posts" , "api.example.com/comments" }; std::vector<std::future<std::string>> futures; for (const auto & url : urls) { futures.push_back ( std::async (std::launch::async, fetch_url, url) ); } for (auto & fut : futures) { std::cout << fut.get () << std::endl; } return 0 ; }
场景2:异常传播 场景 :子线程抛出异常,主线程捕获处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #include <iostream> #include <future> #include <stdexcept> int risky_task (int value) { if (value < 0 ) { throw std::runtime_error ("Negative value!" ); } return value * 2 ; } int main () { auto fut = std::async (std::launch::async, risky_task, -5 ); try { int result = fut.get (); std::cout << "Result: " << result << std::endl; } catch (const std::exception& e) { std::cout << "Caught: " << e.what () << std::endl; } return 0 ; }
6.4 std::promise 手动设置结果 核心用法 :
1 2 3 4 5 6 7 8 9 10 std::promise<int > promise; std::future<int > future = promise.get_future (); promise.set_value (42 ); promise.set_exception (exception_ptr); int result = future.get ();
场景1:回调风格的异步操作 场景 :模拟异步文件读取,完成后通过 promise 通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #include <iostream> #include <thread> #include <future> #include <fstream> void async_read_file (const std::string& filename, std::promise<std::string> promise) { std::thread ([filename, p = std::move (promise)]() mutable { try { std::this_thread::sleep_for (std::chrono::seconds (1 )); std::string content = "File content from " + filename; p.set_value (content); } catch (...) { p.set_exception (std::current_exception ()); } }).detach (); } int main () { std::promise<std::string> promise; std::future<std::string> future = promise.get_future (); async_read_file ("data.txt" , std::move (promise)); std::cout << "Waiting for file..." << std::endl; std::string content = future.get (); std::cout << content << std::endl; std::this_thread::sleep_for (std::chrono::seconds (2 )); return 0 ; }
场景2:线程间一次性信号(类似 latch) 场景 :主线程等待子线程初始化完成的信号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #include <iostream> #include <thread> #include <future> void worker (std::promise<void > ready_signal) { std::cout << "Worker initializing..." << std::endl; std::this_thread::sleep_for (std::chrono::seconds (2 )); std::cout << "Worker ready!" << std::endl; ready_signal.set_value (); } int main () { std::promise<void > promise; std::future<void > future = promise.get_future (); std::thread t (worker, std::move(promise)) ; std::cout << "Main thread waiting..." << std::endl; future.get (); std::cout << "Main thread continues!" << std::endl; t.join (); return 0 ; }
6.5 std::packaged_task 任务包装器 核心特性 :将普通函数包装成可获取 future 的任务。
1 2 3 4 5 std::packaged_task<int (int ) > task ([](int x) { return x * 2 ; }) ;std::future<int > fut = task.get_future (); task (21 ); int result = fut.get ();
场景:简易线程池实现 场景 :将任务提交到队列,工作线程从队列中取出执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 #include <iostream> #include <thread> #include <future> #include <queue> #include <functional> #include <mutex> #include <condition_variable> class SimpleThreadPool { std::vector<std::thread> workers_; std::queue<std::packaged_task<int ()>> tasks_; std::mutex mutex_; std::condition_variable cv_; bool stop_ = false ; public : SimpleThreadPool (size_t num_threads) { for (size_t i = 0 ; i < num_threads; ++i) { workers_.emplace_back ([this ]() { while (true ) { std::packaged_task<int ()> task; { std::unique_lock lock (mutex_); cv_.wait (lock, [this ]{ return stop_ || !tasks_.empty (); }); if (stop_ && tasks_.empty ()) return ; task = std::move (tasks_.front ()); tasks_.pop (); } task (); } }); } } std::future<int > submit (std::function<int ()> func) { std::packaged_task<int () > task (func) ; std::future<int > result = task.get_future (); { std::unique_lock lock (mutex_) ; tasks_.push (std::move (task)); } cv_.notify_one (); return result; } ~SimpleThreadPool () { { std::unique_lock lock (mutex_) ; stop_ = true ; } cv_.notify_all (); for (auto & worker : workers_) { worker.join (); } } }; int main () { SimpleThreadPool pool (4 ) ; std::vector<std::future<int >> results; for (int i = 0 ; i < 8 ; ++i) { results.push_back (pool.submit ([i]() { std::this_thread::sleep_for (std::chrono::milliseconds (100 )); return i * i; })); } for (auto & fut : results) { std::cout << "Result: " << fut.get () << std::endl; } return 0 ; }
6.6 std::shared_future 共享结果 关键差异 :
std::future:只能 get() 一次(移动语义)。
std::shared_future:可多次 get()(拷贝语义)。
场景:广播配置更新 场景 :多个线程等待同一个配置加载完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 #include <iostream> #include <thread> #include <future> #include <vector> std::string load_config () { std::this_thread::sleep_for (std::chrono::seconds (2 )); return "Config{timeout=30, retry=3}" ; } int main () { std::shared_future<std::string> shared_fut = std::async (std::launch::async, load_config).share (); std::vector<std::thread> workers; for (int i = 0 ; i < 5 ; ++i) { workers.emplace_back ([shared_fut, i]() { std::cout << "Worker " << i << " waiting for config..." << std::endl; std::string config = shared_fut.get (); std::cout << "Worker " << i << " got: " << config << std::endl; }); } for (auto & t : workers) { t.join (); } return 0 ; }
6.7 future 状态查询与超时等待 API 说明 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 std::future<int > fut = std::async (task); int result = fut.get ();if (fut.wait_for (std::chrono::seconds (0 )) == std::future_status::ready) { result = fut.get (); } auto status = fut.wait_for (std::chrono::seconds (5 ));switch (status) { case std::future_status::ready: case std::future_status::timeout: case std::future_status::deferred: }
场景:轮询任务进度 场景 :每隔 500ms 检查任务是否完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include <iostream> #include <future> #include <chrono> int long_running_task () { for (int i = 0 ; i < 5 ; ++i) { std::this_thread::sleep_for (std::chrono::seconds (1 )); std::cout << "Progress: " << (i + 1 ) * 20 << "%" << std::endl; } return 42 ; } int main () { auto fut = std::async (std::launch::async, long_running_task); while (true ) { auto status = fut.wait_for (std::chrono::milliseconds (500 )); if (status == std::future_status::ready) { std::cout << "Task completed! Result: " << fut.get () << std::endl; break ; } else { std::cout << "Still running..." << std::endl; } } return 0 ; }
6.8 最佳实践与陷阱 ⚠️ 常见陷阱
get() 只能调用一次
1 2 3 auto fut = std::async (task);int a = fut.get (); int b = fut.get ();
忘记 get() 导致异常丢失
1 2 std::async ([]{ throw std::runtime_error ("error" ); });
默认策略的不确定性
1 2 auto fut = std::async (task); auto fut = std::async (std::launch::async, task);
future 析构会阻塞(async 特有)
1 2 3 4 { auto fut = std::async (std::launch::async, long_task); }
✅ 推荐做法
场景
推荐工具
理由
简单并行计算
std::async
自动管理线程
事件驱动/回调
std::promise
灵活控制时机
任务队列/线程池
std::packaged_task
延迟执行
多线程读取同一结果
std::shared_future
支持多次 get()
需要超时检测
wait_for / wait_until
避免无限阻塞
7. 最佳实践总结 7.1 选型决策树 1 2 3 4 5 6 7 8 9 需要并发? ├─ 简单计数/标志 → std::atomic ├─ 等待 N 个任务完成 → std::latch (C++20) ├─ 多阶段同步 → std::barrier (C++20) ├─ 限制资源数量 → std::counting_semaphore (C++20) ├─ 生产者-消费者 → std::condition_variable + std::unique_lock ├─ 读多写少 → std::shared_mutex + std::shared_lock ├─ 获取异步返回值 → std::future + std::async └─ 通用互斥 → std::mutex + std::scoped_lock
7.2 性能对比
工具
开销
适用场景
std::atomic
极低
简单变量
std::mutex
低
临界区保护
std::shared_mutex
中
读多写少
std::condition_variable
中
事件通知
std::semaphore
低
资源计数
std::latch
极低
一次性同步
7.3 常见陷阱
忘记 join/detach :导致 std::terminate()。
死锁 :使用 std::scoped_lock 同时锁定多个 mutex。
虚假唤醒 :condition_variable 必须使用谓词。
数据竞争 :永远使用 RAII 管理锁,避免手动 lock()/unlock()。
detach 陷阱 :确保主线程存活或避免访问外部引用。
7.4 刷题 1114. 按序打印 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 给你一个类: public class Foo { public void first() { print("first"); } public void second() { print("second"); } public void third() { print("third"); } } 三个不同的线程 A、B、C 将会共用一个 Foo 实例。 线程 A 将会调用 first() 方法 线程 B 将会调用 second() 方法 线程 C 将会调用 third() 方法 请设计修改程序,以确保 second() 方法在 first() 方法之后被执行,third() 方法在 second() 方法之后被执行。 提示: 尽管输入中的数字似乎暗示了顺序,但是我们并不保证线程在操作系统中的调度顺序。 你看到的输入格式主要是为了确保测试的全面性。 示例 1: 输入:nums = [1,2,3] 输出:"firstsecondthird" 解释: 有三个线程会被异步启动。输入 [1,2,3] 表示线程 A 将会调用 first() 方法,线程 B 将会调用 second() 方法,线程 C 将会调用 third() 方法。正确的输出是 "firstsecondthird"。 示例 2: 输入:nums = [1,3,2] 输出:"firstsecondthird" 解释: 输入 [1,3,2] 表示线程 A 将会调用 first() 方法,线程 B 将会调用 third() 方法,线程 C 将会调用 second() 方法。正确的输出是 "firstsecondthird"。 提示: nums 是 [1, 2, 3] 的一组排列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 #include <condition_variable> #include <shared_mutex> #include <mutex> #include <memory> class Foo {private : std::vector<std::unique_ptr<std::condition_variable>> cvs; std::vector<bool > todo; std::mutex mtx; public : Foo () { for (int i = 0 ; i <3 ; i++) { cvs.push_back (std::make_unique <std::condition_variable>()); todo.push_back (false ); } todo[0 ] = true ; } void first (function<void ()> printFirst) { std::unique_lock lock (mtx) ; cvs[0 ]->wait (lock, [&]() { return todo[0 ]; }); printFirst (); todo[1 ] = true ; cvs[1 ]->notify_one (); } void second (function<void ()> printSecond) { std::unique_lock lock (mtx) ; cvs[1 ]->wait (lock, [&]() { return todo[1 ]; }); printSecond (); todo[2 ] = true ; cvs[2 ]->notify_one (); } void third (function<void ()> printThird) { std::unique_lock lock (mtx) ; cvs[2 ]->wait (lock, [&]() { return todo[2 ]; }); printThird (); } };
1115. 交替打印 FooBar 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 给你一个类: class FooBar { public void foo() { for (int i = 0; i < n; i++) { print("foo"); } } public void bar() { for (int i = 0; i < n; i++) { print("bar"); } } } 两个不同的线程将会共用一个 FooBar 实例: 线程 A 将会调用 foo() 方法,而 线程 B 将会调用 bar() 方法 请设计修改程序,以确保 "foobar" 被输出 n 次。 示例 1: 输入:n = 1 输出:"foobar" 解释:这里有两个线程被异步启动。其中一个调用 foo() 方法, 另一个调用 bar() 方法,"foobar" 将被输出一次。 示例 2: 输入:n = 2 输出:"foobarfoobar" 解释:"foobar" 将被输出两次。 提示: 1 <= n <= 1000
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #include <mutex> #include <condition_variable> #include <vector> class FooBar {private : int n; std::mutex mtx; std::vector<bool > todo; std::condition_variable cv; bool done = false ; public : FooBar (int n) { this ->n = n; for (int i = 0 ; i < 2 ; i++) todo.push_back (false ); todo[0 ] = true ; } void foo (function<void ()> printFoo) { for (int i = 0 ; i < n; i++) { std::unique_lock lock (mtx) ; cv.wait (lock, [&]() { return todo[0 ]; }); printFoo (); todo[0 ] = false ; todo[1 ] = true ; cv.notify_all (); } } void bar (function<void ()> printBar) { for (int i = 0 ; i < n; i++) { std::unique_lock lock (mtx) ; cv.wait (lock, [&]() { return todo[1 ]; }); printBar (); todo[1 ] = false ; todo[0 ] = true ; cv.notify_all (); } } };
1116. 打印零与奇偶数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 现有函数 printNumber 可以用一个整数参数调用,并输出该整数到控制台。 例如,调用 printNumber(7) 将会输出 7 到控制台。 给你类 ZeroEvenOdd 的一个实例,该类中有三个函数:zero、even 和 odd 。ZeroEvenOdd 的相同实例将会传递给三个不同线程: 线程 A:调用 zero() ,只输出 0 线程 B:调用 even() ,只输出偶数 线程 C:调用 odd() ,只输出奇数 修改给出的类,以输出序列 "010203040506..." ,其中序列的长度必须为 2n 。 实现 ZeroEvenOdd 类: ZeroEvenOdd(int n) 用数字 n 初始化对象,表示需要输出的数。 void zero(printNumber) 调用 printNumber 以输出一个 0 。 void even(printNumber) 调用printNumber 以输出偶数。 void odd(printNumber) 调用 printNumber 以输出奇数。 示例 1: 输入:n = 2 输出:"0102" 解释:三条线程异步执行,其中一个调用 zero(),另一个线程调用 even(),最后一个线程调用odd()。正确的输出为 "0102"。 示例 2: 输入:n = 5 输出:"0102030405" 提示: 1 <= n <= 1000
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 #include <mutex> #include <condition_variable> class ZeroEvenOdd {private : int n; std::mutex mtx; std::condition_variable cv; int flag = 0 ; public : ZeroEvenOdd (int n) { this ->n = n; } void zero (function<void (int )> printNumber) { for (int i = 0 ; i < n; ++i) { std::unique_lock lock (mtx) ; cv.wait (lock, [&]() { return flag == 0 ;}); printNumber (0 ); flag = (i % 2 ) + 1 ; cv.notify_all (); } } void even (function<void (int )> printNumber) { for (int i = 2 ; i <= n; i+=2 ) { std::unique_lock lock (mtx) ; cv.wait (lock, [&]() { return flag == 2 ;}); printNumber (i); flag = 0 ; cv.notify_all (); } } void odd (function<void (int )> printNumber) { for (int i = 1 ; i <= n; i+=2 ) { std::unique_lock lock (mtx) ; cv.wait (lock, [&]() { return flag == 1 ;}); printNumber (i); flag = 0 ; cv.notify_all (); } } };
1117. H2O 生成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 现在有两种线程,氧 oxygen 和氢 hydrogen,你的目标是组织这两种线程来产生水分子。 存在一个屏障(barrier)使得每个线程必须等候直到一个完整水分子能够被产生出来。 氢和氧线程会被分别给予 releaseHydrogen 和 releaseOxygen 方法来允许它们突破屏障。 这些线程应该三三成组突破屏障并能立即组合产生一个水分子。 你必须保证产生一个水分子所需线程的结合必须发生在下一个水分子产生之前。 换句话说: 如果一个氧线程到达屏障时没有氢线程到达,它必须等候直到两个氢线程到达。 如果一个氢线程到达屏障时没有其它线程到达,它必须等候直到一个氧线程和另一个氢线程到达。 书写满足这些限制条件的氢、氧线程同步代码。 示例 1: 输入: water = "HOH" 输出: "HHO" 解释: "HOH" 和 "OHH" 依然都是有效解。 示例 2: 输入: water = "OOHHHH" 输出: "HHOHHO" 解释: "HOHHHO", "OHHHHO", "HHOHOH", "HOHHOH", "OHHHOH", "HHOOHH", "HOHOHH" 和 "OHHOHH" 依然都是有效解。 提示: 3 * n == water.length 1 <= n <= 20 water[i] == 'O' or 'H' 输入字符串 water 中的 'H' 总数将会是 2 * n 。 输入字符串 water 中的 'O' 总数将会是 n 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 #include <condition_variable> #include <mutex> class H2O {private : std::mutex mutex; std::condition_variable cv; int h_count = 0 ; int o_count = 0 ; public : H2O () { } void hydrogen (function<void ()> releaseHydrogen) { std::unique_lock lock (mutex) ; cv.wait (lock, [&]() { return h_count < 2 ; }); releaseHydrogen (); h_count++; if (h_count == 2 && o_count == 1 ) { h_count = 0 ; o_count = 0 ; } cv.notify_all (); } void oxygen (function<void ()> releaseOxygen) { std::unique_lock lock (mutex) ; cv.wait (lock, [&]() { return o_count < 1 ; }); releaseOxygen (); o_count++; if (h_count == 2 && o_count == 1 ) { h_count = 0 ; o_count = 0 ; } cv.notify_all (); } };
1195. 交替打印字符串 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 编写一个可以从 1 到 n 输出代表这个数字的字符串的程序,但是: 如果这个数字可以被 3 整除,输出 "fizz"。 如果这个数字可以被 5 整除,输出 "buzz"。 如果这个数字可以同时被 3 和 5 整除,输出 "fizzbuzz"。 例如,当 n = 15,输出: 1, 2, fizz, 4, buzz, fizz, 7, 8, fizz, buzz, 11, fizz, 13, 14, fizzbuzz。 假设有这么一个类: class FizzBuzz { public FizzBuzz(int n) { ... } // constructor public void fizz(printFizz) { ... } // only output "fizz" public void buzz(printBuzz) { ... } // only output "buzz" public void fizzbuzz(printFizzBuzz) { ... } // only output "fizzbuzz" public void number(printNumber) { ... } // only output the numbers } 请你实现一个有四个线程的多线程版 FizzBuzz, 同一个 FizzBuzz 实例会被如下四个线程使用: 线程A将调用 fizz() 来判断是否能被 3 整除,如果可以,则输出 fizz。 线程B将调用 buzz() 来判断是否能被 5 整除,如果可以,则输出 buzz。 线程C将调用 fizzbuzz() 来判断是否同时能被 3 和 5 整除,如果可以,则输出 fizzbuzz。 线程D将调用 number() 来实现输出既不能被 3 整除也不能被 5 整除的数字。 提示: 本题已经提供了打印字符串的相关方法,如 printFizz() 等,具体方法名请参考答题模板中的注释部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 #include <mutex> #include <condition_variable> class FizzBuzz {private : int n; int j; std::mutex mutex; std::condition_variable cv; public : FizzBuzz (int n) { this ->n = n; this ->j = 1 ; } void fizz (function<void ()> printFizz) { for (int i = 1 ; i <= n; i++) { if (i % 3 == 0 && i % 15 != 0 ) { std::unique_lock lock (mutex) ; cv.wait (lock, [&]() { return i == j; }); printFizz (); j++; cv.notify_all (); } } } void buzz (function<void ()> printBuzz) { for (int i = 1 ; i <= n; i++) { if (i % 5 == 0 && i % 15 != 0 ) { std::unique_lock lock (mutex) ; cv.wait (lock, [&]() { return i == j; }); printBuzz (); j++; cv.notify_all (); } } } void fizzbuzz (function<void ()> printFizzBuzz) { for (int i = 1 ; i <= n; i++) { if (i % 15 == 0 ) { std::unique_lock lock (mutex) ; cv.wait (lock, [&]() { return i == j; }); printFizzBuzz (); j++; cv.notify_all (); } } } void number (function<void (int )> printNumber) { for (int i = 1 ; i <= n; i++) { if (i % 15 != 0 && i % 3 != 0 && i % 5 != 0 ) { std::unique_lock lock (mutex) ; cv.wait (lock, [&]() { return i == j; }); printNumber (i); j++; cv.notify_all (); } } } };
8. 参考资源