《C++并发编程实战》读书笔记(6):高级线程管理、并行算法函数、测试与除错

C语言与CPP编程 2023-09-13 08:30

击上方“C语言与CPP编程”,选择“关注/置顶/星标公众号

干货福利,第一时间送达!

最近有小伙伴说没有收到当天的文章推送,这是因为微信改了推送机制,有一部分小伙伴刷不到当天的文章,一些比较实用的知识和信息,错过了就是错过了,建议大家加个星标⭐️,就能第一时间收到推送。

小伙伴们大家好,我是飞宇。


最近李伟老师讲了《C++ Primer 5th》这本书的视频,他是美国微软高级工程师,清华大学博士,帮忙推广一下,感兴趣的可以看看。


今天继续更新《Effective C++》和《C++并发编程实战的读书笔记,下面是已经更新过的内容:

《C++并发编程实战》读书笔记(1):并发、线程管控

《C++并发编程实战》读书笔记(2):并发操作的同步

《C++并发编程实战》读书笔记(3):内存模型和原子操作

《C++并发编程实战》读书笔记(4):设计并发数据结构

《Effective C++》读书笔记(1):让自己习惯C++

《Effective C++》读书笔记(2):构造/析构/赋值运算

《Effective C++》读书笔记(3):资源管理

《Effective C++》读书笔记(4):设计与声明

《Effective C++》读书笔记(5):实现

第9章 高级线程管理

9.1 线程池

    大多数程序中并不方便给每个任务分配单独的线程,但仍可通过线程池来充分利用可调配的并发算力:将可同时执行的任务提交到线程池,放入任务队列中等待,工作线程循环地领取并执行任务。

    以下是一种实现,提交任务后返回future,提交者可通过future获取任务结果,任务先被包装成packaged_task再被包装成function,由工作线程来处理。

class ThreadPool {private:    std::vector<std::thread> threads;    ThreadsafeQueue<std::function<void()>> taskQueue;    std::atomic<bool> stop;    join_threads joiner;public:    ThreadPool(size_t numThreads = std::thread::hardware_concurrency())         : stop(false),joiner(threads) {        for (size_t i = 0; i < numThreads; ++i) {            threads.emplace_back([this]() {                while (!stop) {                    run_pending_task();                }            });        }    }        // 避免所有线程都在等待其他线程完成任务    void run_pending_task(){        std::function<void()> task;                       if (taskQueue.try_pop(task))            task();        else            std::this_thread::yield();    }        ~ThreadPool() {        stop = true;    }
template<class F, class... Args>    auto submit(F&& fArgs&&... args) -> std::future<decltype(f(args...))> { using ReturnType = decltype(f(args...)); auto task = std::make_shared<std::packaged_task>( std::bind(std::forward(f), std::forward(args)...) ); std::future result = task->get_future(); taskQueue.push([task]() { (*task)(); }); return result;    }};


    例如可以实现基于线程池的快排:

template <typename T>struct sorter {    ThreadPool pool;
std::list do_sort(std::list& chunk_data) {        if (chunk_data.empty()) return chunk_data;        // 将原list分为大小两段 std::list result; result.splice(result.begin(), chunk_data, chunk_data.begin());        T const& partition_val = *result.begin();        auto divide_point = std::partition(chunk_data.begin(), chunk_data.end(),[&](T const& val) { return val < partition_val; });        // 两段分别处理 std::list new_lower_chunk;        new_lower_chunk.splice(new_lower_chunk.end(), chunk_data,chunk_data.begin(), divide_point);        auto new_lower = pool.submit(std::bind(&sorter::do_sort, thisstd::move(new_lower_chunk)));        std::list new_higher(do_sort(chunk_data)); result.splice(result.end(), new_higher); // 避免所有线程彼此等待 while (!new_lower.is_ready()) { pool.run_pending_task();        } result.splice(result.begin(), new_lower.get()); return result; }};
template <typename T>std::list parallel_quick_sort(std::list input) {    if (input.empty()) return input;    sorter s; return s.do_sort(input);}


    上述线程池仅具备一个全局的任务队列,即使使用无锁队列来优化仍然会有严重的缓存乒乓,导致性能浪费。可以为每个线程配备thread_local任务队列,仅当线程自身线程没有任务时才从全局队列领取任务。

    此外,倘若某线程自身队列为空,而另一线程的队列为满,需支持窃取任务。首先实现支持这样操作的队列,仅用锁简单实现,一端用于push/pop,另一端用于steal。

class work_stealing_queue {private:    typedef std::function<void()> data_type;    std::deque the_queue;    mutable std::mutex the_mutex;
public: work_stealing_queue() {}
work_stealing_queue(const work_stealing_queue& other) = delete; work_stealing_queue& operator=(const work_stealing_queue& other) = delete;
void push(data_type data) { std::lock_guard<std::mutex> lock(the_mutex); the_queue.push_front(std::move(data)); }
bool try_pop(data_type& res) { std::lock_guard<std::mutex> lock(the_mutex); if (the_queue.empty()) return false; res = std::move(the_queue.front()); the_queue.pop_front(); return true; }
bool try_steal(data_type& res) { std::lock_guard<std::mutex> lock(the_mutex); if (the_queue.empty()) return false; res = std::move(the_queue.back()); the_queue.pop_back(); return true; }};


    基于上面的结构,可以实现支持任务窃取的线程池:

class thread_pool {private:    typedef std::function<void()> task_type;    std::vector<std::thread> threads;    join_threads joiner;    std::atomic_bool done;    // 全局任务队列    thread_safe_queue pool_work_queue;    std::vector<std::unique_ptr> queues;    // 指向线程独有的任务队列    static thread_local work_stealing_queue* local_work_queue;    // 线程编号    static thread_local unsigned my_index;
void worker_thread(unsigned my_index_) { my_index = my_index_; local_work_queue = queues[my_index].get(); while (!done) { run_pending_task(); } }
bool pop_task_from_local_queue(task_type& task) { return local_work_queue && local_work_queue->try_pop(task); }
bool pop_task_from_pool_queue(task_type& task) { return pool_work_queue.try_pop(task); } // 遍历,偷取任务 bool pop_task_from_other_thread_queue(task_type& task) { for (unsigned i = 0; i < queues.size(); ++i) { unsigned const index = (my_index + i + 1) % queues.size(); if (queues[index]->try_steal(task)) { return true; }        } return false; }
public: thread_pool() : joiner(threads), done(false) { unsigned const thread_count = std::thread::hardware_concurrency(); try { for (unsigned i = 0; i < thread_count; ++i) { queues.push_back(std::unique_ptr( new work_stealing_queue)); threads.push_back( std::thread(&thread_pool::worker_thread, this, i)); } } catch (...) { done = true; throw; } }
~thread_pool() { done = true; }
template <class F, class... Args> auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> { using ReturnType = decltype(f(args...)); auto task = std::make_shared<std::packaged_task>( std::bind(std::forward(f), std::forward(args)...)); std::future result = task->get_future(); if (local_work_queue) { local_work_queue->push([task]() { (*task)(); }); } else { pool_work_queue.push([task]() { (*task)(); }); } return result; }
void run_pending_task() { task_type task; if (pop_task_from_local_queue(task) || pop_task_from_pool_queue(task) || pop_task_from_other_thread_queue(task)) { task(); } else { std::this_thread::yield(); } }};


9.2 中断线程 

    C++20中引入了能接收中断、自动join的jthread。但自己实现也不复杂。借助thread_local的interrupt_flag来辅助实现,通过interrupt成员函数来设置中断,并借此实现可中断的条件变量/future上的等待。

thread_local interrupt_flag this_thread_interrupt_flag;
class interruptible_thread { std::thread internal_thread; interrupt_flag* flag;
public: template <typename FunctionType> interruptible_thread(FunctionType f) { std::promise p; internal_thread = std::thread([f, &p] { p.set_value(&this_thread_interrupt_flag);            try{             f();            }catch(...){} }); flag = p.get_future().get(); }    // 设置中断 void interrupt() { if (flag) { flag->set(); } }};// 如果已设置中断则抛出异常void interruption_point() { if (this_thread_interrupt_flag.is_set()) { throw std::exception(); }}// 可中断的条件变量等待template <typename Lockable>void interruptible_wait(std::condition_variable_any& cv, Lockable& lk) { this_thread_interrupt_flag.wait(cv, lk);}// 可中断的future等待template <typename T, typename Lockable>void interruptible_wait(std::future& uf, Lockable& lk) { while (!this_thread_interrupt_flag.is_set()) { if (uf.wait_for(lk, 1ms) == std::future_status::ready) break; }}


    其中,interrupt_flag的实现如下,基于condition_variable_any而非普通条件变量,set时(即设置中断时)唤醒条件变量,wait时多次检查是否设置中断。

class interrupt_flag {    std::atomic<bool> flag;    std::condition_variable_any* thread_cond_any;    std::mutex set_clear_mutex;
public: interrupt_flag() : thread_cond_any(nullptr) {} void set() { flag.store(true, std::memory_order_relaxed); std::lock_guard<std::mutex> lk(set_clear_mutex); if (thread_cond_any) { thread_cond_any->notify_all(); } }
bool is_set() const { return flag.load(std::memory_order_relaxed); }
template <typename Lockable> void wait(std::condition_variable_any& cv, Lockable& lk) { struct custom_lock { interrupt_flag* self; Lockable& lk; custom_lock(interrupt_flag* self_, std::condition_variable_any& cond, Lockable& lk_) : self(self_), lk(lk_) { self->set_clear_mutex.lock(); self->thread_cond_any = &cond; } void unlock() { lk.unlock(); self->set_clear_mutex.unlock(); } void lock() { std::lock(self->set_clear_mutex, lk); } ~custom_lock() { self->thread_cond_any = nullptr; } }; custom_lock cl(this, cv, lk); interruption_point(); cv.wait(cl); interruption_point(); }};


    可以用try/catch来捕获中断,按某种方式处理然后继续执行。中断线程在实际应用中的常见场景是运行程序前开启后台任务,程序运行完退出时中断后台任务。



第10章 并行算法函数

    C++17向标准库加入了并行算法函数,在原有函数的参数列表前新增了执行策略参数。中定义了三种执行策略sequenced_policy、parallel_policy、parallel_unsequenced_policy,以及对应的传给并行算法函数的对象seq、par、par_unseq。

    不同策略会影响算法函数的复杂度、抛出异常时的行为、何时何地何种方式执行。其中seq代表顺序策略,令算法函数在发起调用的线程上执行全部操作,没有内存次序限制;par代表并行策略,内部操作可能在发起调用的线程上也可能另外创建线程执行,涉及的变量绝不能引发数据竞争;par_unseq代表非顺序并行策略,并行化最高,涉及的变量不得以任何形式同步。

    例如某网站有庞大的日志,需要逐行处理日志提炼各项信息,最后聚合结果,类似mapreduce。由于每行日志的处理都独立,只需最后总数正确,所以可以用transfrom_reduce来处理:

struct log_info {    std::string page;    time_t visit_time;    std::string browser;};
extern log_info parse_log_line(std::string const &line);
using visit_map_type = std::unordered_map<std::string, unsigned long long>;
visit_map_type count_visits_per_page( std::vector<std::string> const &log_lines) { struct combine_visits { visit_map_type operator()(visit_map_type lhs, visit_map_type rhs) const { if (lhs.size() < rhs.size()) std::swap(lhs, rhs); for (auto const &entry : rhs) { lhs[entry.first] += entry.second; } return lhs; }
visit_map_type operator()(log_info log, visit_map_type map) const { ++map[log.page]; return map; } visit_map_type operator()(visit_map_type map, log_info log) const { ++map[log.page]; return map; } visit_map_type operator()(log_info log1, log_info log2) const { visit_map_type map; ++map[log1.page]; ++map[log2.page]; return map; } };
return std::transform_reduce(std::execution::par, log_lines.begin(), log_lines.end(), visit_map_type(), combine_visits(), parse_log_line);}




第11章 多线程应用的测试和除错

    跟并发相关的错误主要分为多余的阻塞和条件竞争。多余的阻塞包括死锁、活锁(例如两自旋锁互相等待)、IO等外部阻塞。条件竞争包括数据竞争(对共享内存区域的并发访问未采取同步)、受到破坏的不变量、生存期问题。

    定位这些错误的技法包括审查代码并定位潜在错误、通过测试定位错误、设计可测试的代码、多线程测试技术(压力测试,组合模拟测试,特殊程序库)、测试多线程代码的性能。

    这里仅作简介,更详细的内容还需读原文。

EOF

你好,我是飞宇,本硕均于某中流985 CS就读,先后于百度搜索以及字节跳动电商等部门担任Linux C/C++后端研发工程师。

同时,我也是知乎博主@韩飞宇,日常分享C/C++、计算机学习经验、工作体会,欢迎点击此处查看我以前的学习笔记&经验&分享的资源。

我组建了一些社群一起交流,群里有大牛也有小白,如果你有意可以一起进群交流。

欢迎你添加我的微信,我拉你进技术交流群。此外,我也会经常在微信上分享一些计算机学习经验以及工作体验,还有一些内推机会

加个微信,打开另一扇窗

C语言与CPP编程 C语言/C++开发,C语言/C++基础知识,C语言/C++学习路线,C语言/C++进阶,数据结构;算法;python;计算机基础等
评论
  • 晶台光耦KL817和KL3053在小家电产品(如微波炉等)辅助电源中的广泛应用。具备小功率、高性能、高度集成以及低待机功耗的特点,同时支持宽输入电压范围。▲光耦在实物应用中的产品图其一次侧集成了交流电压过零检测与信号输出功能,该功能产生的过零信号可用于精确控制继电器、可控硅等器件的过零开关动作,从而有效减小开关应力,显著提升器件的使用寿命。通过高度的集成化和先进的控制技术,该电源大幅减少了所需的外围器件数量,不仅降低了系统成本和体积,还进一步增强了整体的可靠性。▲电路示意图该电路的过零检测信号由
    晶台光耦 2025-01-16 10:12 89浏览
  • 实用性高值得收藏!! (时源芯微)时源专注于EMC整改与服务,配备完整器件 TVS全称Transient Voltage Suppre,亦称TVS管、瞬态抑制二极管等,有单向和双向之分。单向TVS 一般应用于直流供电电路,双向TVS 应用于电压交变的电路。在直流电路的应用中,TVS被并联接入电路中。在电路处于正常运行状态时,TVS会保持截止状态,从而不对电路的正常工作产生任何影响。然而,一旦电路中出现异常的过电压,并且这个电压达到TVS的击穿阈值时,TVS的状态就会
    时源芯微 2025-01-16 14:23 130浏览
  • 随着消费者对汽车驾乘体验的要求不断攀升,汽车照明系统作为确保道路安全、提升驾驶体验以及实现车辆与环境交互的重要组成,日益受到业界的高度重视。近日,2024 DVN(上海)国际汽车照明研讨会圆满落幕。作为照明与传感创新的全球领导者,艾迈斯欧司朗受邀参与主题演讲,并现场展示了其多项前沿技术。本届研讨会汇聚来自全球各地400余名汽车、照明、光源及Tier 2供应商的专业人士及专家共聚一堂。在研讨会第一环节中,艾迈斯欧司朗系统解决方案工程副总裁 Joachim Reill以深厚的专业素养,主持该环节多位
    艾迈斯欧司朗 2025-01-16 20:51 90浏览
  • 故障现象 一辆2007款法拉利599 GTB车,搭载6.0 L V12自然吸气发动机(图1),累计行驶里程约为6万km。该车因发动机故障灯异常点亮进厂检修。 图1 发动机的布置 故障诊断接车后试车,发动机怠速轻微抖动,发动机故障灯长亮。用故障检测仪检测,发现发动机控制单元(NCM)中存储有故障代码“P0300 多缸失火”“P0309 气缸9失火”“P0307 气缸7失火”,初步判断发动机存在失火故障。考虑到该车使用年数较长,决定先使用虹科Pico汽车示波器进行相对压缩测试,以
    虹科Pico汽车示波器 2025-01-15 17:30 90浏览
  • 日前,商务部等部门办公厅印发《手机、平板、智能手表(手环)购新补贴实施方案》明确,个人消费者购买手机、平板、智能手表(手环)3类数码产品(单件销售价格不超过6000元),可享受购新补贴。每人每类可补贴1件,每件补贴比例为减去生产、流通环节及移动运营商所有优惠后最终销售价格的15%,每件最高不超过500元。目前,京东已经做好了承接手机、平板等数码产品国补优惠的落地准备工作,未来随着各省市关于手机、平板等品类的国补开启,京东将第一时间率先上线,满足消费者的换新升级需求。为保障国补的真实有效发放,基于
    华尔街科技眼 2025-01-17 10:44 98浏览
  • 近期,智能家居领域Matter标准的制定者,全球最具影响力的科技联盟之一,连接标准联盟(Connectivity Standards Alliance,简称CSA)“利好”频出,不仅为智能家居领域的设备制造商们提供了更为快速便捷的Matter认证流程,而且苹果、三星与谷歌等智能家居平台厂商都表示会接纳CSA的Matter认证体系,并计划将其整合至各自的“Works with”项目中。那么,在本轮“利好”背景下,智能家居的设备制造商们该如何捉住机会,“掘金”万亿市场呢?重认证快通道计划,为家居设备
    华普微HOPERF 2025-01-16 10:22 157浏览
  • 全球领先的光学解决方案供应商艾迈斯欧司朗(SIX:AMS)近日宣布,与汽车技术领先者法雷奥合作,采用创新的开放系统协议(OSP)技术,旨在改变汽车内饰照明方式,革新汽车行业座舱照明理念。结合艾迈斯欧司朗开创性的OSIRE® E3731i智能LED和法雷奥的动态环境照明系统,两家公司将为车辆内饰设计和功能设立一套全新标准。汽车内饰照明的作用日益凸显,座舱设计的主流趋势应满足终端用户的需求:即易于使用、个性化,并能提供符合用户生活方式的清晰信息。因此,动态环境照明带来了众多新机遇。智能LED的应用已
    艾迈斯欧司朗 2025-01-15 19:00 74浏览
  • 随着智慧科技的快速发展,智能显示器的生态圈应用变得越来越丰富多元,智能显示器不仅仅是传统的显示设备,透过结合人工智能(AI)和语音助理,它还可以成为家庭、办公室和商业环境中的核心互动接口。提供多元且个性化的服务,如智能家居控制、影音串流拨放、实时信息显示等,极大提升了使用体验。此外,智能家居系统的整合能力也不容小觑,透过智能装置之间的无缝连接,形成了强大的多元应用生态圈。企业也利用智能显示器进行会议展示和多方远程合作,大大提高效率和互动性。Smart Display Ecosystem示意图,作
    百佳泰测试实验室 2025-01-16 15:37 148浏览
  • 百佳泰特为您整理2025年1月各大Logo的最新规格信息,本月有更新信息的logo有HDMI、Wi-Fi、Bluetooth、DisplayHDR、ClearMR、Intel EVO。HDMI®▶ 2025年1月6日,HDMI Forum, Inc. 宣布即将发布HDMI规范2.2版本。新规范将支持更高的分辨率和刷新率,并提供更多高质量选项。更快的96Gbps 带宽可满足数据密集型沉浸式和虚拟应用对传输的要求,如 AR/VR/MR、空间现实和光场显示,以及各种商业应用,如大型数字标牌、医疗成像和
    百佳泰测试实验室 2025-01-16 15:41 142浏览
  • 电竞鼠标应用环境与客户需求电竞行业近年来发展迅速,「鼠标延迟」已成为决定游戏体验与比赛结果的关键因素。从技术角度来看,传统鼠标的延迟大约为20毫秒,入门级电竞鼠标通常为5毫秒,而高阶电竞鼠标的延迟可降低至仅2毫秒。这些差异看似微小,但在竞技激烈的游戏中,尤其在对反应和速度要求极高的场景中,每一毫秒的优化都可能带来致胜的优势。电竞比赛的普及促使玩家更加渴望降低鼠标延迟以提升竞技表现。他们希望通过精确的测试,了解不同操作系统与设定对延迟的具体影响,并寻求最佳配置方案来获得竞技优势。这样的需求推动市场
    百佳泰测试实验室 2025-01-16 15:45 197浏览
  • 一个易用且轻量化的UI可以大大提高用户的使用效率和满意度——通过快速启动、直观操作和及时反馈,帮助用户快速上手并高效完成任务;轻量化设计则可以减少资源占用,提升启动和运行速度,增强产品竞争力。LVGL(Light and Versatile Graphics Library)是一个免费开源的图形库,专为嵌入式系统设计。它以轻量级、高效和易于使用而著称,支持多种屏幕分辨率和硬件配置,并提供了丰富的GUI组件,能够帮助开发者轻松构建出美观且功能强大的用户界面。近期,飞凌嵌入式为基于NXP i.MX9
    飞凌嵌入式 2025-01-16 13:15 158浏览
  • 80,000人到访的国际大展上,艾迈斯欧司朗有哪些亮点?感未来,光无限。近日,在慕尼黑electronica 2024现场,ams OSRAM通过多款创新DEMO展示,以及数场前瞻洞察分享,全面展示自身融合传感器、发射器及集成电路技术,精准捕捉并呈现环境信息的卓越能力。同时,ams OSRAM通过展会期间与客户、用户等行业人士,以及媒体朋友的深度交流,向业界传达其以光电技术为笔、以创新为墨,书写智能未来的深度思考。electronica 2024electronica 2024构建了一个高度国际
    艾迈斯欧司朗 2025-01-16 20:45 109浏览
我要评论
0
点击右上角,分享到朋友圈 我知道啦
请使用浏览器分享功能 我知道啦