点击上方“C语言与CPP编程”,选择“关注/置顶/星标公众号”
干货福利,第一时间送达!
最近有小伙伴说没有收到当天的文章推送,这是因为微信改了推送机制,有一部分小伙伴刷不到当天的文章,一些比较实用的知识和信息,错过了就是错过了,建议大家加个星标⭐️,就能第一时间收到推送。
小伙伴们大家好,我是飞宇。
最近李伟老师讲了《C++ Primer 5th》这本书的视频,他是美国微软高级工程师,清华大学博士,帮忙推广一下,感兴趣的可以看看。
今天继续更新《Effective C++》和《C++并发编程实战》的读书笔记,下面是已经更新过的内容:
《C++并发编程实战》读书笔记(1):并发、线程管控
《C++并发编程实战》读书笔记(2):并发操作的同步
《C++并发编程实战》读书笔记(3):内存模型和原子操作
《C++并发编程实战》读书笔记(4):设计并发数据结构
《Effective C++》读书笔记(1):让自己习惯C++
《Effective C++》读书笔记(2):构造/析构/赋值运算
《Effective C++》读书笔记(3):资源管理
《Effective C++》读书笔记(4):设计与声明
《Effective C++》读书笔记(5):实现
第9章 高级线程管理
9.1 线程池
大多数程序中并不方便给每个任务分配单独的线程,但仍可通过线程池来充分利用可调配的并发算力:将可同时执行的任务提交到线程池,放入任务队列中等待,工作线程循环地领取并执行任务。
以下是一种实现,提交任务后返回future,提交者可通过future获取任务结果,任务先被包装成packaged_task再被包装成function,由工作线程来处理。
class ThreadPool {
private:
std::vector<std::thread> threads;
ThreadsafeQueue<std::function<void()>> taskQueue;
std::atomic<bool> stop;
join_threads joiner;
public:
ThreadPool(size_t numThreads = std::thread::hardware_concurrency())
: stop(false),joiner(threads) {
for (size_t i = 0; i < numThreads; ++i) {
threads.emplace_back([this]() {
while (!stop) {
run_pending_task();
}
});
}
}
// 避免所有线程都在等待其他线程完成任务
void run_pending_task(){
std::function<void()> task;
if (taskQueue.try_pop(task))
task();
else
std::this_thread::yield();
}
~ThreadPool() {
stop = true;
}
template<class F, class... Args>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using ReturnType = decltype(f(args...));
auto task = std::make_shared<std::packaged_task
>( std::bind(std::forward
(f), std::forward(args)...) );
std::future
result = task->get_future(); taskQueue.push([task]() { (*task)(); });
return result;
}
};
例如可以实现基于线程池的快排:
template <typename T>
struct sorter {
ThreadPool pool;
std::list
do_sort(std::list & chunk_data) { if (chunk_data.empty()) return chunk_data;
// 将原list分为大小两段
std::list
result; result.splice(result.begin(), chunk_data, chunk_data.begin());
T const& partition_val = *result.begin();
auto divide_point = std::partition(chunk_data.begin(), chunk_data.end(),[&](T const& val) { return val < partition_val; });
// 两段分别处理
std::list
new_lower_chunk; new_lower_chunk.splice(new_lower_chunk.end(), chunk_data,chunk_data.begin(), divide_point);
auto new_lower = pool.submit(std::bind(&sorter::do_sort, this, std::move(new_lower_chunk)));
std::list
new_higher(do_sort(chunk_data)); result.splice(result.end(), new_higher);
// 避免所有线程彼此等待
while (!new_lower.is_ready()) {
pool.run_pending_task();
}
result.splice(result.begin(), new_lower.get());
return result;
}
};
template <typename T>
std::list
parallel_quick_sort(std::list input) { if (input.empty()) return input;
sorter
s; return s.do_sort(input);
}
上述线程池仅具备一个全局的任务队列,即使使用无锁队列来优化仍然会有严重的缓存乒乓,导致性能浪费。可以为每个线程配备thread_local任务队列,仅当线程自身线程没有任务时才从全局队列领取任务。
此外,倘若某线程自身队列为空,而另一线程的队列为满,需支持窃取任务。首先实现支持这样操作的队列,仅用锁简单实现,一端用于push/pop,另一端用于steal。
class work_stealing_queue {
private:
typedef std::function<void()> data_type;
std::deque
the_queue; mutable std::mutex the_mutex;
public:
work_stealing_queue() {}
work_stealing_queue(const work_stealing_queue& other) = delete;
work_stealing_queue& operator=(const work_stealing_queue& other) = delete;
void push(data_type data) {
std::lock_guard<std::mutex> lock(the_mutex);
the_queue.push_front(std::move(data));
}
bool try_pop(data_type& res) {
std::lock_guard<std::mutex> lock(the_mutex);
if (the_queue.empty()) return false;
res = std::move(the_queue.front());
the_queue.pop_front();
return true;
}
bool try_steal(data_type& res) {
std::lock_guard<std::mutex> lock(the_mutex);
if (the_queue.empty()) return false;
res = std::move(the_queue.back());
the_queue.pop_back();
return true;
}
};
基于上面的结构,可以实现支持任务窃取的线程池:
class thread_pool {
private:
typedef std::function<void()> task_type;
std::vector<std::thread> threads;
join_threads joiner;
std::atomic_bool done;
// 全局任务队列
thread_safe_queue
pool_work_queue; std::vector<std::unique_ptr
> queues; // 指向线程独有的任务队列
static thread_local work_stealing_queue* local_work_queue;
// 线程编号
static thread_local unsigned my_index;
void worker_thread(unsigned my_index_) {
my_index = my_index_;
local_work_queue = queues[my_index].get();
while (!done) {
run_pending_task();
}
}
bool pop_task_from_local_queue(task_type& task) {
return local_work_queue && local_work_queue->try_pop(task);
}
bool pop_task_from_pool_queue(task_type& task) {
return pool_work_queue.try_pop(task);
}
// 遍历,偷取任务
bool pop_task_from_other_thread_queue(task_type& task) {
for (unsigned i = 0; i < queues.size(); ++i) {
unsigned const index = (my_index + i + 1) % queues.size();
if (queues[index]->try_steal(task)) {
return true;
}
}
return false;
}
public:
thread_pool() : joiner(threads), done(false) {
unsigned const thread_count = std::thread::hardware_concurrency();
try {
for (unsigned i = 0; i < thread_count; ++i) {
queues.push_back(std::unique_ptr
( new work_stealing_queue));
threads.push_back(
std::thread(&thread_pool::worker_thread, this, i));
}
} catch (...) {
done = true;
throw;
}
}
~thread_pool() { done = true; }
template <class F, class... Args>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using ReturnType = decltype(f(args...));
auto task = std::make_shared<std::packaged_task
>( std::bind(std::forward
(f), std::forward (args)...)); std::future
result = task->get_future(); if (local_work_queue) {
local_work_queue->push([task]() { (*task)(); });
} else {
pool_work_queue.push([task]() { (*task)(); });
}
return result;
}
void run_pending_task() {
task_type task;
if (pop_task_from_local_queue(task) || pop_task_from_pool_queue(task) ||
pop_task_from_other_thread_queue(task)) {
task();
} else {
std::this_thread::yield();
}
}
};
9.2 中断线程
C++20中引入了能接收中断、自动join的jthread。但自己实现也不复杂。借助thread_local的interrupt_flag来辅助实现,通过interrupt成员函数来设置中断,并借此实现可中断的条件变量/future上的等待。
thread_local interrupt_flag this_thread_interrupt_flag;
class interruptible_thread {
std::thread internal_thread;
interrupt_flag* flag;
public:
template <typename FunctionType>
interruptible_thread(FunctionType f) {
std::promise
p; internal_thread = std::thread([f, &p] {
p.set_value(&this_thread_interrupt_flag);
try{
f();
}catch(...){}
});
flag = p.get_future().get();
}
// 设置中断
void interrupt() {
if (flag) {
flag->set();
}
}
};
// 如果已设置中断则抛出异常
void interruption_point() {
if (this_thread_interrupt_flag.is_set()) {
throw std::exception();
}
}
// 可中断的条件变量等待
template <typename Lockable>
void interruptible_wait(std::condition_variable_any& cv, Lockable& lk) {
this_thread_interrupt_flag.wait(cv, lk);
}
// 可中断的future等待
template <typename T, typename Lockable>
void interruptible_wait(std::future
& uf, Lockable& lk) {while (!this_thread_interrupt_flag.is_set()) {
if (uf.wait_for(lk, 1ms) == std::future_status::ready) break;
}
}
其中,interrupt_flag的实现如下,基于condition_variable_any而非普通条件变量,set时(即设置中断时)唤醒条件变量,wait时多次检查是否设置中断。
class interrupt_flag {
std::atomic<bool> flag;
std::condition_variable_any* thread_cond_any;
std::mutex set_clear_mutex;
public:
interrupt_flag() : thread_cond_any(nullptr) {}
void set() {
flag.store(true, std::memory_order_relaxed);
std::lock_guard<std::mutex> lk(set_clear_mutex);
if (thread_cond_any) {
thread_cond_any->notify_all();
}
}
bool is_set() const { return flag.load(std::memory_order_relaxed); }
template <typename Lockable>
void wait(std::condition_variable_any& cv, Lockable& lk) {
struct custom_lock {
interrupt_flag* self;
Lockable& lk;
custom_lock(interrupt_flag* self_,
std::condition_variable_any& cond, Lockable& lk_)
: self(self_), lk(lk_) {
self->set_clear_mutex.lock();
self->thread_cond_any = &cond;
}
void unlock() {
lk.unlock();
self->set_clear_mutex.unlock();
}
void lock() { std::lock(self->set_clear_mutex, lk); }
~custom_lock() { self->thread_cond_any = nullptr; }
};
custom_lock cl(this, cv, lk);
interruption_point();
cv.wait(cl);
interruption_point();
}
};
可以用try/catch来捕获中断,按某种方式处理然后继续执行。中断线程在实际应用中的常见场景是运行程序前开启后台任务,程序运行完退出时中断后台任务。
第10章 并行算法函数
C++17向标准库加入了并行算法函数,在原有函数的参数列表前新增了执行策略参数。
不同策略会影响算法函数的复杂度、抛出异常时的行为、何时何地何种方式执行。其中seq代表顺序策略,令算法函数在发起调用的线程上执行全部操作,没有内存次序限制;par代表并行策略,内部操作可能在发起调用的线程上也可能另外创建线程执行,涉及的变量绝不能引发数据竞争;par_unseq代表非顺序并行策略,并行化最高,涉及的变量不得以任何形式同步。
例如某网站有庞大的日志,需要逐行处理日志提炼各项信息,最后聚合结果,类似mapreduce。由于每行日志的处理都独立,只需最后总数正确,所以可以用transfrom_reduce来处理:
struct log_info {
std::string page;
time_t visit_time;
std::string browser;
};
extern log_info parse_log_line(std::string const &line);
using visit_map_type = std::unordered_map<std::string, unsigned long long>;
visit_map_type count_visits_per_page(
std::vector<std::string> const &log_lines) {
struct combine_visits {
visit_map_type operator()(visit_map_type lhs,
visit_map_type rhs) const {
if (lhs.size() < rhs.size()) std::swap(lhs, rhs);
for (auto const &entry : rhs) {
lhs[entry.first] += entry.second;
}
return lhs;
}
visit_map_type operator()(log_info log, visit_map_type map) const {
++map[log.page];
return map;
}
visit_map_type operator()(visit_map_type map, log_info log) const {
++map[log.page];
return map;
}
visit_map_type operator()(log_info log1, log_info log2) const {
visit_map_type map;
++map[log1.page];
++map[log2.page];
return map;
}
};
return std::transform_reduce(std::execution::par, log_lines.begin(),
log_lines.end(), visit_map_type(),
combine_visits(), parse_log_line);
}
第11章 多线程应用的测试和除错
跟并发相关的错误主要分为多余的阻塞和条件竞争。多余的阻塞包括死锁、活锁(例如两自旋锁互相等待)、IO等外部阻塞。条件竞争包括数据竞争(对共享内存区域的并发访问未采取同步)、受到破坏的不变量、生存期问题。
定位这些错误的技法包括审查代码并定位潜在错误、通过测试定位错误、设计可测试的代码、多线程测试技术(压力测试,组合模拟测试,特殊程序库)、测试多线程代码的性能。
这里仅作简介,更详细的内容还需读原文。
—— EOF —— 你好,我是飞宇,本硕均于某中流985 CS就读,先后于百度搜索以及字节跳动电商等部门担任Linux C/C++后端研发工程师。
同时,我也是知乎博主@韩飞宇,日常分享C/C++、计算机学习经验、工作体会,欢迎点击此处查看我以前的学习笔记&经验&分享的资源。
我组建了一些社群一起交流,群里有大牛也有小白,如果你有意可以一起进群交流。
欢迎你添加我的微信,我拉你进技术交流群。此外,我也会经常在微信上分享一些计算机学习经验以及工作体验,还有一些内推机会。
加个微信,打开另一扇窗